from __future__ import annotations import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.const import SohuVideoCrawlerConst from applications.db import DatabaseConnector from applications.pipeline import scrape_video_entities_process from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import insert_into_single_video_source_table from cold_start.crawler.sohu import get_video_detail from cold_start.crawler.sohu import get_hot_point_videos from cold_start.crawler.sohu import get_recommendation_video_list from cold_start.crawler.sohu import get_user_homepage_videos from config import long_articles_config const = SohuVideoCrawlerConst() class CrawlerSohuVideos: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() def crawler_each_video(self, video_data): """ crawler each video data """ video_item = Item() unique_id = f"{const.PLATFORM}-{video_data['id']}" # add info into item video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id))) video_item.add("url_unique_md5", video_data["id"]) video_item.add("article_title", video_data["title"]) video_item.add("out_account_id", video_data["authorId"]) video_item.add("out_account_name", video_data["authorName"]) video_item.add("publish_timestamp", video_data["postTime"] / 1000) video_item.add("platform", const.PLATFORM) video_item.add("article_url", video_data["videoUrl"]) video_item.add("source_account", const.GET_RECOMMEND_INIT_STATUS) video_item.add("crawler_timestamp", int(time.time())) # check item before insert video_item.check(source="video") try: item_with_oss_path = scrape_video_entities_process( video_item=video_item.item, db_client=self.db_client ) if item_with_oss_path: insert_into_single_video_source_table( db_client=self.db_client, video_item=item_with_oss_path ) except Exception as e: detail = { "video_item": video_item.item, "error": str(e), "traceback": traceback.format_exc(), } log( task="crawler_sohu_videos", function="crawler_each_video", message="crawler_sohu_videos failed", status="failed", data=detail, ) class CrawlerSohuHotVideos(CrawlerSohuVideos): # process hot video obj to satisfy video item def process_hot_video_obj(self, video_obj): """ process hot video obj """ article_url = f"https://www.sohu.com{video_obj['url']}" video_detail_response = get_video_detail(article_url=article_url) item_obj = { "id": video_obj["id"], "title": video_obj["title"], "authorId": video_detail_response["account_id"], "authorName": video_detail_response["account_name"], "postTime": video_detail_response["publish_timestamp"], "videoUrl": video_detail_response["video_url"], } self.crawler_each_video(item_obj) def deal(self): """ crawler sohu hot videos every day """ hot_point_video_response = get_hot_point_videos() hot_point_video_list = hot_point_video_response["data"][ "tpl-card-feed-pc-data" ]["list"] for video in tqdm(hot_point_video_list, desc="crawler sohu hot videos"): try: self.process_hot_video_obj(video) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": video, }, ) class CrawlerSohuRecommendVideos(CrawlerSohuVideos): def fetch_seed_videos(self) -> list[dict]: """ get seed videos from database """ fetch_query = f""" select id, out_account_id, url_unique_md5, article_title, score from publish_single_video_source where platform = '{const.PLATFORM}' and source_account = {const.GET_RECOMMEND_INIT_STATUS} and score > {const.GET_RECOMMEND_THRESHOLD_SCORE} and audit_status = {const.AUDIT_SUCCESS_STATUS} and bad_status = {const.VIDEO_NOT_BAD_STATUS}; """ seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return seed_videos def get_each_video_recommendation(self, seed_video: dict) -> None: """ get each video recommendation """ author_id = seed_video["out_account_id"] article_id = seed_video["url_unique_md5"] outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}" for page in const.PAGE_LIST: try: response = get_recommendation_video_list( outside_url, author_id, article_id, page ) if response: video_list = response["data"]["recommendVideoFeed"]["list"] for video in tqdm(video_list, desc=f"page: {page}"): self.crawler_each_video(video) except Exception as e: log( task="crawler_sohu_videos", function="get_each_video_recommendation", message="get_each_video_recommendation failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "page": page, }, ) def update_seed_video_status(self, task_id: int) -> int: """ update seed video status """ update_query = f""" update publish_single_video_source set source_account = %s where id = %s and source_account = %s; """ return self.db_client.save( query=update_query, params=( const.GET_RECOMMEND_SUCCESS_STATUS, task_id, const.GET_RECOMMEND_INIT_STATUS, ), ) def deal(self): task_list = self.fetch_seed_videos() for task in tqdm(task_list): try: self.get_each_video_recommendation(task) self.update_seed_video_status(task_id=task["id"]) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": task, }, ) class CrawlerSohuUserPageVideos(CrawlerSohuVideos): """ 个人主页文章占比大,账号体系还未建设,本次上线暂时不抓取,后续有需要再考虑 """ def get_author_list(self): """ get author list from database """ return [] def process_each_page(self, response: dict): """ process each page """ video_list = response["data"]["FeedSlideloadAuthor_2_0_pc_1655965929143_data2"][ "list" ] for video in tqdm(video_list, desc="crawler sohu user page videos"): try: self.crawler_each_video(video) except Exception as e: log( task="crawler_sohu_videos", function="process_each_page", message="crawler_sohu_user_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video": video, }, ) def get_each_user_videos(self, author_id: int): """ get each user videos """ page_list = [i for i in range(1, 2)] for page in page_list: try: response = get_user_homepage_videos(author_id, page) self.process_each_page(response) except Exception as e: log( task="crawler_sohu_videos", function="get_each_user_videos", message="crawler_sohu_user_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "author_id": author_id, "page": page, }, ) def deal(self): author_list = self.get_author_list() for author_id in tqdm(author_list, desc="crawler sohu user videos"): try: self.get_each_user_videos(author_id) except Exception as e: log( task="crawler_sohu_videos", function="crawler_sohu_hot_videos", message="crawler_sohu_hot_videos failed", status="failed", data={ "error": str(e), "traceback": traceback.format_exc(), "author_od": author_id, }, )