""" @author: luojunhui """ from __future__ import annotations import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.const import ToutiaoVideoCrawlerConst from applications.db import DatabaseConnector from applications.pipeline import scrape_video_entities_process from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import insert_into_single_video_source_table from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list from config import apolloConfig, long_articles_config const = ToutiaoVideoCrawlerConst() config = apolloConfig() cookie = config.getConfigValue("toutiao_blogger_cookie") class CrawlerToutiaoAccountVideos: """ toutiao blogger crawler """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_account_list(self): """ get account list """ sql = f""" select account_id, max_cursor from video_meta_accounts where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS}; """ account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor) return account_list def crawler_each_account_video_list( self, account_id: str, max_cursor: int | None, max_behot_time: int = 0 ): """ account_id: toutiao account id max_cursor: crawler latest cursor for each account max_behot_time: max behot time from toutiao, use to switch to next page """ has_more = True current_cursor = max_behot_time max_cursor = max_cursor or const.DEFAULT_CURSOR while has_more: response = get_toutiao_account_video_list( account_id=account_id, cookie=cookie, max_behot_time=current_cursor ) if not response: break if response["message"] != "success": log( task="crawler_toutiao_account_videos", function="crawler_toutiao_account_videos", message="get response from toutiao failed", data={"account_id": account_id, "response": response}, ) break video_list = response["data"] has_more = response["has_more"] current_cursor = response["next"]["max_behot_time"] if not video_list: break max_timestamp_in_this_group = video_list[0]["publish_time"] if max_timestamp_in_this_group < max_cursor: break # do crawler each video crawler_video_list_bar = tqdm(video_list, desc="crawler videos") for video in crawler_video_list_bar: try: crawler_video_list_bar.set_postfix({"video_id": video["id"]}) self.crawler_each_video(video) except Exception as e: log( task="crawler_toutiao_account_videos", function="crawler_each_account_video_list", message="crawler each video failed", data={ "account_id": account_id, "video_info": video, "error": str(e), "traceback": traceback.format_exc(), }, ) if has_more: time.sleep(const.SLEEP_SECOND) else: break def crawler_each_video(self, video_data): """ crawler each video data """ video_item = Item() video_id = video_data["group_id"] title = video_data["title"] media = video_data["video"] url = media["download_addr"]["url_list"][0] # add info into item video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id)))) video_item.add("url_unique_md5", video_id) video_item.add("article_title", title) video_item.add("out_account_id", video_data["user"]["user_id"]) video_item.add("out_account_name", video_data["source"]) video_item.add("publish_timestamp", video_data["publish_time"]) video_item.add("platform", const.PLATFORM) video_item.add("read_cnt", video_data.get("read_count", 0)) video_item.add("article_url", url) video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS) video_item.add("crawler_timestamp", int(time.time())) # check item before insert video_item.check(source="video") try: item_with_oss_path = scrape_video_entities_process( video_item=video_item.item, db_client=self.db_client ) if item_with_oss_path: insert_into_single_video_source_table( self.db_client, item_with_oss_path ) except Exception as e: log( task="crawler_toutiao_account_videos", function="crawler_toutiao_account_videos", message="etl failed", data={ "video_item": video_item.item, "error": str(e), "traceback": traceback.format_exc(), } ) def update_account_max_cursor(self, account_id: str) -> None: """ update account max cursor """ select_sql = f""" select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}' and platform = '{const.PLATFORM}'; """ response_mysql = self.db_client.fetch(query=select_sql) max_publish_timestamp = response_mysql[0][0] if max_publish_timestamp: update_sql = f""" update video_meta_accounts set max_cursor = %s where account_id = %s and platform = %s; """ self.db_client.save( query=update_sql, params=(max_publish_timestamp, account_id, const.PLATFORM), ) def deal(self) -> None: """ class entrance """ account_list = self.get_account_list() account_list_bar = tqdm(account_list, desc="crawler toutiao accounts") for account in account_list_bar: account_id = account["account_id"] max_cursor = account["max_cursor"] try: # crawl each account account_list_bar.set_postfix({"account_id": account_id}) self.crawler_each_account_video_list( account_id=account_id, max_cursor=max_cursor ) self.update_account_max_cursor(account_id) except Exception as e: # add log and bot log( task="crawler_toutiao_account_videos", function="deal", message=account_id, data={ "error": str(e), "traceback": traceback.format_exc(), }, )