123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- """
- @author: luojunhui
- """
- from __future__ import annotations
- import time
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.const import ToutiaoVideoCrawlerConst
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_video_entities_process
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import insert_into_single_video_source_table
- from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
- from config import apolloConfig, long_articles_config
- const = ToutiaoVideoCrawlerConst()
- config = apolloConfig()
- cookie = config.getConfigValue("toutiao_blogger_cookie")
- class CrawlerToutiaoAccountVideos:
- """
- toutiao blogger crawler
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_account_list(self):
- """
- get account list
- """
- sql = f"""
- select account_id, max_cursor
- from video_meta_accounts
- where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS};
- """
- account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
- return account_list
- def crawler_each_account_video_list(
- self, account_id: str, max_cursor: int | None, max_behot_time: int = 0
- ):
- """
- account_id: toutiao account id
- max_cursor: crawler latest cursor for each account
- max_behot_time: max behot time from toutiao, use to switch to next page
- """
- has_more = True
- current_cursor = max_behot_time
- max_cursor = max_cursor or const.DEFAULT_CURSOR
- while has_more:
- response = get_toutiao_account_video_list(
- account_id=account_id, cookie=cookie, max_behot_time=current_cursor
- )
- if not response:
- break
- if response["message"] != "success":
- log(
- task="crawler_toutiao_account_videos",
- function="crawler_toutiao_account_videos",
- message="get response from toutiao failed",
- data={"account_id": account_id, "response": response},
- )
- break
- video_list = response["data"]
- has_more = response["has_more"]
- current_cursor = response["next"]["max_behot_time"]
- if not video_list:
- break
- max_timestamp_in_this_group = video_list[0]["publish_time"]
- if max_timestamp_in_this_group < max_cursor:
- break
- # do crawler each video
- crawler_video_list_bar = tqdm(video_list, desc="crawler videos")
- for video in crawler_video_list_bar:
- try:
- crawler_video_list_bar.set_postfix({"video_id": video["id"]})
- self.crawler_each_video(video)
- except Exception as e:
- log(
- task="crawler_toutiao_account_videos",
- function="crawler_each_account_video_list",
- message="crawler each video failed",
- data={
- "account_id": account_id,
- "video_info": video,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- if has_more:
- time.sleep(const.SLEEP_SECOND)
- else:
- break
- def crawler_each_video(self, video_data):
- """
- crawler each video data
- """
- video_item = Item()
- video_id = video_data["group_id"]
- title = video_data["title"]
- media = video_data["video"]
- url = media["download_addr"]["url_list"][0]
- # add info into item
- video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
- video_item.add("url_unique_md5", video_id)
- video_item.add("article_title", title)
- video_item.add("out_account_id", video_data["user"]["user_id"])
- video_item.add("out_account_name", video_data["source"])
- video_item.add("publish_timestamp", video_data["publish_time"])
- video_item.add("platform", const.PLATFORM)
- video_item.add("read_cnt", video_data.get("read_count", 0))
- video_item.add("article_url", url)
- video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
- video_item.add("crawler_timestamp", int(time.time()))
- # check item before insert
- video_item.check(source="video")
- try:
- item_with_oss_path = scrape_video_entities_process(
- video_item=video_item.item, db_client=self.db_client
- )
- if item_with_oss_path:
- insert_into_single_video_source_table(
- self.db_client, item_with_oss_path
- )
- except Exception as e:
- log(
- task="crawler_toutiao_account_videos",
- function="crawler_toutiao_account_videos",
- message="etl failed",
- data={
- "video_item": video_item.item,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
- def update_account_max_cursor(self, account_id: str) -> None:
- """
- update account max cursor
- """
- select_sql = f"""
- select max(publish_timestamp) as max_cursor
- from publish_single_video_source
- where out_account_id = '{account_id}' and platform = '{const.PLATFORM}';
- """
- response_mysql = self.db_client.fetch(query=select_sql)
- max_publish_timestamp = response_mysql[0][0]
- if max_publish_timestamp:
- update_sql = f"""
- update video_meta_accounts
- set max_cursor = %s
- where account_id = %s and platform = %s;
- """
- self.db_client.save(
- query=update_sql,
- params=(max_publish_timestamp, account_id, const.PLATFORM),
- )
- def deal(self) -> None:
- """
- class entrance
- """
- account_list = self.get_account_list()
- account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
- for account in account_list_bar:
- account_id = account["account_id"]
- max_cursor = account["max_cursor"]
- try:
- # crawl each account
- account_list_bar.set_postfix({"account_id": account_id})
- self.crawler_each_account_video_list(
- account_id=account_id, max_cursor=max_cursor
- )
- self.update_account_max_cursor(account_id)
- except Exception as e:
- # add log and bot
- log(
- task="crawler_toutiao_account_videos",
- function="deal",
- message=account_id,
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
|