from __future__ import annotations import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications.db import DatabaseConnector from applications.pipeline import scrape_video_entities_process from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import insert_into_single_video_source_table from config import long_articles_config from coldStartTasks.crawler.sohu import get_recommendation_video_list class CrawlerSohuRecommendVideos: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() self.platform = 'sohu' def fetch_seed_videos(self) -> list[dict]: """ get seed videos from database """ fetch_query = f""" select id, out_account_id, url_unique_md5, article_title, score from publish_single_video_source where platform = 'sohu' and source_account = 0 and score > 0.6 and audit_status = 1 and bad_status = 0; """ seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return seed_videos def crawler_each_video(self, video_data): """ crawler each video data """ video_item = Item() unique_id = f"{self.platform}-{video_data['id']}" # add info into item video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id))) video_item.add("url_unique_md5", video_data["id"]) video_item.add("article_title", video_data["title"]) video_item.add("out_account_id", video_data['authorId']) video_item.add("out_account_name", video_data["authorName"]) video_item.add("publish_timestamp", video_data["postTime"] / 1000) video_item.add("platform", self.platform) video_item.add("article_url", video_data["videoUrl"]) video_item.add("source_account", 0) video_item.add("crawler_timestamp", int(time.time())) # check item before insert video_item.check(source="video") try: item_with_oss_path = scrape_video_entities_process( video_item=video_item.item, db_client=self.db_client ) if item_with_oss_path: insert_into_single_video_source_table( db_client=self.db_client, video_item=item_with_oss_path ) except Exception as e: detail = { "video_item": video_item.item, "error": str(e), "traceback": traceback.format_exc(), } print(detail) def get_each_video_recommendation(self, seed_video: dict) -> None: """ get each video recommendation """ author_id = seed_video["out_account_id"] article_id = seed_video["url_unique_md5"] outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}" page_list = [i for i in range(1, 8)] for page in page_list: try: response = get_recommendation_video_list(outside_url, author_id, article_id, page) if response: video_list = response['data']['recommendVideoFeed']['list'] for video in tqdm(video_list, desc=f"page: {page}"): self.crawler_each_video(video) except Exception as e: print(e) print(traceback.format_exc()) continue def update_seed_video_status(self, task_id: int) -> int: """ update seed video status """ update_query = f""" update publish_single_video_source set source_account = %s where id = %s and source_account = %s; """ return self.db_client.save( query=update_query, params=(1, task_id, 0) ) def deal(self): task_list = self.fetch_seed_videos() for task in tqdm(task_list): try: self.get_each_video_recommendation(task) self.update_seed_video_status(task_id=task["id"]) except Exception as e: print(e)