from __future__ import annotations import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.db import DatabaseConnector from applications.pipeline import scrape_video_entities_process from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import insert_into_single_video_source_table from coldStartTasks.crawler.sohu import get_hot_point_videos from coldStartTasks.crawler.sohu import get_recommendation_video_list from coldStartTasks.crawler.sohu import get_user_homepage_videos from config import long_articles_config class CrawlerSohuVideos: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() self.platform = "sohu" def crawler_each_video(self, video_data): """ crawler each video data """ video_item = Item() unique_id = f"{self.platform}-{video_data['id']}" # add info into item video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id))) video_item.add("url_unique_md5", video_data["id"]) video_item.add("article_title", video_data["title"]) video_item.add("out_account_id", video_data["authorId"]) video_item.add("out_account_name", video_data["authorName"]) video_item.add("publish_timestamp", video_data["postTime"] / 1000) video_item.add("platform", self.platform) video_item.add("article_url", video_data["videoUrl"]) video_item.add("source_account", 0) video_item.add("crawler_timestamp", int(time.time())) # check item before insert video_item.check(source="video") try: item_with_oss_path = scrape_video_entities_process( video_item=video_item.item, db_client=self.db_client ) if item_with_oss_path: insert_into_single_video_source_table( db_client=self.db_client, video_item=item_with_oss_path ) except Exception as e: detail = { "video_item": video_item.item, "error": str(e), "traceback": traceback.format_exc(), } print(detail) class CrawlerSohuHotVideos(CrawlerSohuVideos): def deal(self): """ crawler sohu hot videos every day """ hot_point_video_response = get_hot_point_videos() hot_point_video_list = hot_point_video_response["data"][ "tpl-card-feed-pc-data" ]["list"] for video in tqdm(hot_point_video_list, desc="crawler sohu hot videos"): try: self.crawler_each_video(video) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": video, }, ) class CrawlerSohuRecommendVideos(CrawlerSohuVideos): def fetch_seed_videos(self) -> list[dict]: """ get seed videos from database """ fetch_query = f""" select id, out_account_id, url_unique_md5, article_title, score from publish_single_video_source where platform = 'sohu' and source_account = 0 and score > 0.6 and audit_status = 1 and bad_status = 0; """ seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return seed_videos def get_each_video_recommendation(self, seed_video: dict) -> None: """ get each video recommendation """ author_id = seed_video["out_account_id"] article_id = seed_video["url_unique_md5"] outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}" page_list = [i for i in range(1, 8)] for page in page_list: try: response = get_recommendation_video_list( outside_url, author_id, article_id, page ) if response: video_list = response["data"]["recommendVideoFeed"]["list"] for video in tqdm(video_list, desc=f"page: {page}"): self.crawler_each_video(video) except Exception as e: print(e) print(traceback.format_exc()) continue def update_seed_video_status(self, task_id: int) -> int: """ update seed video status """ update_query = f""" update publish_single_video_source set source_account = %s where id = %s and source_account = %s; """ return self.db_client.save(query=update_query, params=(1, task_id, 0)) def deal(self): task_list = self.fetch_seed_videos() for task in tqdm(task_list): try: self.get_each_video_recommendation(task) self.update_seed_video_status(task_id=task["id"]) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": task, }, ) class CrawlerSohuUserPageVideos(CrawlerSohuVideos): def get_author_list(self): """ get author list from database """ return [121644888] def process_each_page(self, response: dict): """ process each page """ video_list = response["data"]["FeedSlideloadAuthor_2_0_pc_1655965929143_data2"][ "list" ] for video in tqdm(video_list, desc="crawler sohu user page videos"): try: self.crawler_each_video(video) except Exception as e: log( task="crawler_sohu_videos", function="process_each_page", message="crawler_sohu_user_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": video, }, ) def get_each_user_videos(self, author_id: int): """ get each user videos """ page_list = [i for i in range(1, 2)] for page in page_list: try: response = get_user_homepage_videos(author_id, page) self.process_each_page(response) except Exception as e: log( task="crawler_sohu_videos", function="get_each_user_videos", message="crawler_sohu_user_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "author_id": author_id, "page": page, }, ) def deal(self): author_list = self.get_author_list() for author_id in tqdm(author_list, desc="crawler sohu user videos"): try: self.get_each_user_videos(author_id) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "author_od": author_id, }, )