123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from __future__ import annotations
- import json
- import time
- import traceback
- from typing import List, Dict
- from tqdm import tqdm
- from applications.api import feishu_robot
- from applications.crawler.toutiao import get_toutiao_account_info_list
- from applications.pipeline import CrawlerPipeline
- class CrawlerToutiaoConst:
- # platform
- PLATFORM = "toutiao"
- # account status
- TOUTIAO_ACCOUNT_GOOD_STATUS = 1
- TOUTIAO_ACCOUNT_BAD_STATUS = 0
- # earliest cursor, 2021-01-01 00:00:00
- DEFAULT_CURSOR = 1609430400
- # no source account
- NO_SOURCE_ACCOUNT_STATUS = 0
- # title length min
- MIN_TITLE_LENGTH = 10
- # max video length(second)
- MAX_VIDEO_LENGTH = 600
- # sleep second
- SLEEP_SECOND = 3
- class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- async def get_account_list(self, media_type: str) -> List[dict]:
- """get toutiao account list"""
- match media_type:
- case "video":
- table = "video_meta_accounts"
- case "article":
- table = "article_meta_accounts"
- case _:
- return []
- # fetch query
- query = f"""
- select account_id, max_cursor
- from {table}
- where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
- """
- response = await self.pool.async_fetch(query)
- if not response:
- await feishu_robot.bot(
- title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
- detail={"platform": self.PLATFORM, "error": "获取账号异常"},
- )
- return []
- else:
- return response
- async def crawler_each_account_info_list(
- self,
- account_id: str,
- media_type: str,
- max_cursor: int | None,
- max_behot_time: int = 0,
- ):
- """
- account_id: toutiao account id
- max_cursor: crawler latest cursor for each account
- max_behot_time: max behot time from toutiao, use to switch to next page
- """
- has_more = True
- current_cursor = max_behot_time
- max_cursor = max_cursor or self.DEFAULT_CURSOR
- cookie = await self.get_config_value(
- key="toutiao_blogger_cookie", output_type="string"
- )
- while has_more:
- print(account_id, max_cursor)
- response = await get_toutiao_account_info_list(
- account_id=account_id,
- cookie=cookie,
- media_type=media_type,
- max_behot_time=current_cursor,
- )
- print(response)
- if not response:
- break
- if response["message"] != "success":
- break
- info_list = response["data"]
- has_more = response["has_more"]
- current_cursor = response["next"]["max_behot_time"]
- if not info_list:
- break
- max_timestamp_in_this_group = info_list[0]["publish_time"]
- if max_timestamp_in_this_group < max_cursor:
- break
- # do crawler
- match media_type:
- case "video":
- bar_description = "crawler videos"
- case "article":
- bar_description = "crawler articles"
- case _:
- raise Exception(f"unknown media type: {media_type}")
- crawler_info_list_bar = tqdm(info_list, desc=bar_description)
- print(json.dumps(info_list, ensure_ascii=False, indent=4))
- for info in crawler_info_list_bar:
- try:
- crawler_info_list_bar.set_postfix({"id": info["id"]})
- match media_type:
- case "video":
- await self.crawler_each_video(info)
- case "article":
- await self.crawler_each_article(info)
- case _:
- raise Exception(f"unknown media type: {media_type}")
- except Exception as e:
- raise Exception(f"crawler each info failed: {e}")
- if has_more:
- time.sleep(self.SLEEP_SECOND)
- else:
- break
- async def crawler_each_article(self, article_raw_data):
- """
- crawler each article
- """
- new_article_item = {
- "platform": self.PLATFORM,
- "mode": "account",
- "category": "toutiao_account_association",
- "out_account_id": article_raw_data["user_info"]["user_id"],
- "title": article_raw_data["title"],
- "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
- "read_cnt": article_raw_data["read_count"],
- "like_cnt": article_raw_data["like_count"],
- "description": article_raw_data["abstract"],
- "publish_time": article_raw_data["publish_time"],
- "unique_index": article_raw_data["group_id"],
- }
- await self.save_item_to_database(media_type="article", item=new_article_item)
- async def crawler_each_video(self, video_raw_data):
- pass
- async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
- """
- update account max cursor
- """
- match media_type:
- case "video":
- query = f"""
- select max(publish_timestamp) as max_cursor
- from publish_single_video_source
- where out_account_id = %s and platform = %s;
- """
- table = "video_meta_accounts"
- case "article":
- query = f"""
- select max(publish_time) as max_cursor
- from crawler_meta_article
- where out_account_id = %s and platform = %s;
- """
- table = "article_meta_accounts"
- case _:
- raise Exception(f"unknown media type: {media_type}")
- response = await self.pool.async_fetch(
- query, params=(account_id, self.PLATFORM)
- )
- max_publish_timestamp = response[0]["max_cursor"]
- if max_publish_timestamp:
- query = f"""
- update {table}
- set max_cursor = %s
- where account_id = %s and platform = %s;
- """
- await self.pool.async_save(
- query, (max_publish_timestamp, account_id, self.PLATFORM)
- )
- async def crawler_task(self, media_type: str) -> None:
- """
- class entrance
- """
- account_list = await self.get_account_list(media_type=media_type)
- account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
- for account in account_list_bar:
- account_id = account["account_id"]
- max_cursor = account["max_cursor"]
- try:
- account_list_bar.set_postfix({"account_id": account_id})
- await self.crawler_each_account_info_list(
- account_id=account_id, media_type=media_type, max_cursor=max_cursor
- )
- await self.update_account_max_cursor(
- media_type=media_type, account_id=account_id
- )
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "crawler_toutiao_account_info",
- "function": "crawler_task",
- "message": account_id,
- "data": {
- "media_type": media_type,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- }
- )
|