| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | """@author: luojunhui"""from __future__ import annotationsimport timeimport tracebackfrom pymysql.cursors import DictCursorfrom tqdm import tqdmfrom applications import logfrom applications.const import ToutiaoVideoCrawlerConstfrom applications.db import DatabaseConnectorfrom applications.pipeline import scrape_video_entities_processfrom applications.utils import Itemfrom applications.utils import str_to_md5from applications.utils import insert_into_single_video_source_tablefrom cold_start.crawler.toutiao import get_toutiao_account_video_listfrom config import apolloConfig, long_articles_configconst = ToutiaoVideoCrawlerConst()config = apolloConfig()cookie = config.getConfigValue("toutiao_blogger_cookie")class CrawlerToutiaoAccountVideos:    """    toutiao blogger crawler    """    def __init__(self):        self.db_client = DatabaseConnector(db_config=long_articles_config)        self.db_client.connect()    def get_account_list(self):        """        get account list        """        sql = f"""            select account_id, max_cursor            from video_meta_accounts            where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS};        """        account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)        return account_list    def crawler_each_account_video_list(        self, account_id: str, max_cursor: int | None, max_behot_time: int = 0    ):        """        account_id: toutiao account id        max_cursor: crawler latest cursor for each account        max_behot_time: max behot time from toutiao, use to switch to next page        """        has_more = True        current_cursor = max_behot_time        max_cursor = max_cursor or const.DEFAULT_CURSOR        while has_more:            response = get_toutiao_account_video_list(                account_id=account_id, cookie=cookie, max_behot_time=current_cursor            )            if not response:                break            if response["message"] != "success":                log(                    task="crawler_toutiao_account_videos",                    function="crawler_toutiao_account_videos",                    message="get response from toutiao failed",                    data={"account_id": account_id, "response": response},                )                break            video_list = response["data"]            has_more = response["has_more"]            current_cursor = response["next"]["max_behot_time"]            if not video_list:                break            max_timestamp_in_this_group = video_list[0]["publish_time"]            if max_timestamp_in_this_group < max_cursor:                break            # do crawler each video            crawler_video_list_bar = tqdm(video_list, desc="crawler videos")            for video in crawler_video_list_bar:                try:                    crawler_video_list_bar.set_postfix({"video_id": video["id"]})                    self.crawler_each_video(video)                except Exception as e:                    log(                        task="crawler_toutiao_account_videos",                        function="crawler_each_account_video_list",                        message="crawler each video failed",                        data={                            "account_id": account_id,                            "video_info": video,                            "error": str(e),                            "traceback": traceback.format_exc(),                        },                    )            if has_more:                time.sleep(const.SLEEP_SECOND)            else:                break    def crawler_each_video(self, video_data):        """        crawler each video data        """        video_item = Item()        video_id = video_data["group_id"]        title = video_data["title"]        media = video_data["video"]        url = media["download_addr"]["url_list"][0]        # add info into item        video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))        video_item.add("url_unique_md5", video_id)        video_item.add("article_title", title)        video_item.add("out_account_id", video_data["user"]["user_id"])        video_item.add("out_account_name", video_data["source"])        video_item.add("publish_timestamp", video_data["publish_time"])        video_item.add("platform", const.PLATFORM)        video_item.add("read_cnt", video_data.get("read_count", 0))        video_item.add("article_url", url)        video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)        video_item.add("crawler_timestamp", int(time.time()))        # check item before insert        video_item.check(source="video")        try:            item_with_oss_path = scrape_video_entities_process(                video_item=video_item.item, db_client=self.db_client            )            if item_with_oss_path:                insert_into_single_video_source_table(                    self.db_client, item_with_oss_path                )        except Exception as e:            log(                task="crawler_toutiao_account_videos",                function="crawler_toutiao_account_videos",                message="etl failed",                data={                    "video_item": video_item.item,                    "error": str(e),                    "traceback": traceback.format_exc(),                }            )    def update_account_max_cursor(self, account_id: str) -> None:        """        update account max cursor        """        select_sql = f"""            select max(publish_timestamp) as max_cursor             from publish_single_video_source             where out_account_id = '{account_id}' and platform = '{const.PLATFORM}';        """        response_mysql = self.db_client.fetch(query=select_sql)        max_publish_timestamp = response_mysql[0][0]        if max_publish_timestamp:            update_sql = f"""                update video_meta_accounts                set max_cursor = %s                where account_id = %s and platform = %s;            """            self.db_client.save(                query=update_sql,                params=(max_publish_timestamp, account_id, const.PLATFORM),            )    def deal(self) -> None:        """        class entrance        """        account_list = self.get_account_list()        account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")        for account in account_list_bar:            account_id = account["account_id"]            max_cursor = account["max_cursor"]            try:                # crawl each account                account_list_bar.set_postfix({"account_id": account_id})                self.crawler_each_account_video_list(                    account_id=account_id, max_cursor=max_cursor                )                self.update_account_max_cursor(account_id)            except Exception as e:                # add log and bot                log(                    task="crawler_toutiao_account_videos",                    function="deal",                    message=account_id,                    data={                        "error": str(e),                        "traceback": traceback.format_exc(),                    },                )
 |