123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- from __future__ import annotations
- import json
- import time
- import aiohttp
- import traceback
- from datetime import datetime
- from typing import List, Dict
- from tqdm import tqdm
- from applications.api import feishu_robot
- from applications.crawler.toutiao import get_toutiao_account_info_list
- from applications.pipeline import CrawlerPipeline
- from applications.utils import async_proxy
- class CrawlerToutiaoConst:
- # platform
- PLATFORM = "toutiao"
- # account status
- TOUTIAO_ACCOUNT_GOOD_STATUS = 1
- TOUTIAO_ACCOUNT_BAD_STATUS = 0
- # earliest cursor, 2021-01-01 00:00:00
- DEFAULT_CURSOR = 1609430400
- # no source account
- NO_SOURCE_ACCOUNT_STATUS = 0
- # title length min
- MIN_TITLE_LENGTH = 10
- # max video length(second)
- MAX_VIDEO_LENGTH = 600
- # sleep second
- SLEEP_SECOND = 3
- RECOMMEND_TIMES = 10
- class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client)
- self.trace_id = trace_id
- async def get_request_params(self, category):
- """
- get request params
- """
- query = f"""
- select request_method, request_url, request_headers, post_data
- from toutiao_request_params
- where category = %s and expire_flag = %s
- order by id desc limit 1;
- """
- response = await self.pool.async_fetch(query=query, params=(category, 0))
- if not response:
- now = datetime.now()
- if 10 < now.hour < 21:
- await feishu_robot.bot(
- title="今日头条推荐流,cookie 过期",
- detail={"info": "cookie expired"},
- )
- return None
- else:
- return response[0]
- async def get_account_list(self, media_type: str) -> List[dict]:
- """get toutiao account list"""
- match media_type:
- case "video":
- table = "video_meta_accounts"
- case "article":
- table = "article_meta_accounts"
- case _:
- return []
- # fetch query
- query = f"""
- select account_id, max_cursor
- from {table}
- where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
- """
- response = await self.pool.async_fetch(query)
- await self.log_client.log(
- contents={
- "trace_id": self.trace_id,
- "task": "crawler_toutiao",
- "function": "get_account_list",
- "message": f"get toutiao account list, media_type: {media_type}",
- "status": "success",
- "data": response
- }
- )
- if not response:
- await feishu_robot.bot(
- title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
- detail={"platform": self.PLATFORM, "error": "获取账号异常"},
- )
- return []
- else:
- return response
- async def crawler_each_account_info_list(
- self,
- account_id: str,
- media_type: str,
- max_cursor: int | None,
- max_behot_time: int = 0,
- ):
- """
- account_id: toutiao account id
- max_cursor: crawler latest cursor for each account
- max_behot_time: max behot time from toutiao, use to switch to next page
- """
- has_more = True
- current_cursor = max_behot_time
- max_cursor = max_cursor or self.DEFAULT_CURSOR
- cookie = await self.get_config_value(
- key="toutiao_blogger_cookie", output_type="string"
- )
- while has_more:
- response = await get_toutiao_account_info_list(
- account_id=account_id,
- cookie=cookie,
- media_type=media_type,
- max_behot_time=current_cursor,
- )
- if not response:
- break
- if response["message"] != "success":
- break
- info_list = response["data"]
- has_more = response["has_more"]
- current_cursor = response["next"]["max_behot_time"]
- if not info_list:
- break
- max_timestamp_in_this_group = info_list[0]["publish_time"]
- if max_timestamp_in_this_group < max_cursor:
- break
- # do crawler
- match media_type:
- case "video":
- bar_description = "crawler videos"
- case "article":
- bar_description = "crawler articles"
- case _:
- raise Exception(f"unknown media type: {media_type}")
- crawler_info_list_bar = tqdm(info_list, desc=bar_description)
- for info in crawler_info_list_bar:
- try:
- crawler_info_list_bar.set_postfix({"id": info["id"]})
- match media_type:
- case "video":
- await self.crawler_each_video(info)
- case "article":
- await self.crawler_each_article(
- method="account", article_raw_data=info
- )
- case _:
- raise Exception(f"unknown media type: {media_type}")
- except Exception as e:
- raise Exception(f"crawler each info failed: {e}")
- if has_more:
- time.sleep(self.SLEEP_SECOND)
- else:
- break
- async def crawler_each_article(self, method, article_raw_data, category=None):
- """
- crawler each article
- """
- # 公共字段提取
- base_item = {
- "platform": self.PLATFORM,
- "mode": method,
- "out_account_id": article_raw_data["user_info"]["user_id"],
- "title": article_raw_data["title"],
- "read_cnt": article_raw_data["read_count"],
- "like_cnt": article_raw_data["like_count"],
- "publish_time": article_raw_data["publish_time"],
- "crawler_time": int(time.time()),
- }
- match method:
- case "account":
- new_article_item = {
- **base_item,
- "category": "toutiao_account_association",
- "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
- "description": article_raw_data["abstract"],
- "unique_index": article_raw_data["group_id"],
- }
- case "recommend":
- new_article_item = {
- **base_item,
- "category": category,
- "title": article_raw_data["title"],
- "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
- "description": article_raw_data["Abstract"],
- "unique_index": article_raw_data["item_id"],
- }
- case _:
- raise Exception(f"unknown method: {method}")
- await self.log_client.log(
- contents={
- "task": "crawler_toutiao",
- "function": "crawler_each_article",
- "trace_id": self.trace_id,
- "message": "抓取文章成功",
- "status": "success",
- "data": new_article_item,
- }
- )
- await self.save_item_to_database(media_type="article", item=new_article_item)
- async def crawler_each_video(self, video_raw_data):
- pass
- async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
- """
- update account max cursor
- """
- match media_type:
- case "video":
- query = f"""
- select max(publish_timestamp) as max_cursor
- from publish_single_video_source
- where out_account_id = %s and platform = %s;
- """
- table = "video_meta_accounts"
- case "article":
- query = f"""
- select max(publish_time) as max_cursor
- from crawler_meta_article
- where out_account_id = %s and platform = %s;
- """
- table = "article_meta_accounts"
- case _:
- raise Exception(f"unknown media type: {media_type}")
- response = await self.pool.async_fetch(
- query, params=(account_id, self.PLATFORM)
- )
- max_publish_timestamp = response[0]["max_cursor"]
- if max_publish_timestamp:
- query = f"""
- update {table}
- set max_cursor = %s
- where account_id = %s and platform = %s;
- """
- await self.pool.async_save(
- query, (max_publish_timestamp, account_id, self.PLATFORM)
- )
- async def crawler_task(self, media_type: str) -> None:
- """
- class entrance
- """
- account_list = await self.get_account_list(media_type=media_type)
- account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
- for account in account_list_bar:
- account_id = account["account_id"]
- max_cursor = account["max_cursor"]
- try:
- account_list_bar.set_postfix({"account_id": account_id})
- await self.crawler_each_account_info_list(
- account_id=account_id, media_type=media_type, max_cursor=max_cursor
- )
- await self.update_account_max_cursor(
- media_type=media_type, account_id=account_id
- )
- await self.log_client.log(
- contents={
- "trace_id": self.trace_id,
- "task": "crawler_toutiao_account_info",
- "function": "crawler_task",
- "message": f"crawler account: {account_id} successfully, media type: {media_type}",
- "status": "success"
- }
- )
- except Exception as e:
- await self.log_client.log(
- contents={
- "trace_id": self.trace_id,
- "task": "crawler_toutiao_account_info",
- "function": "crawler_task",
- "message": f"crawler_account: {account_id} fail",
- "status": "fail",
- "data": {
- "media_type": media_type,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- }
- )
- async def crawler_recommend_articles(self, category: str) -> None:
- cookie = await self.get_request_params(category=category)
- if not cookie:
- return
- for crawler_time in range(self.RECOMMEND_TIMES):
- try:
- proxy_url = async_proxy()["url"]
- proxy_auth = aiohttp.BasicAuth(async_proxy()["username"], async_proxy()["password"])
- async with aiohttp.ClientSession() as session:
- async with session.request(
- method=cookie["request_method"],
- url=cookie["request_url"],
- headers=json.loads(cookie["request_headers"]),
- proxy=proxy_url,
- proxy_auth=proxy_auth
- ) as response:
- response.raise_for_status()
- response_json = await response.json()
- await self.log_client.log(
- contents={
- "task": "crawler_toutiao",
- "function": "crawler_recommend_articles",
- "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
- "trace_id": self.trace_id,
- "status": "success",
- "data": response_json,
- }
- )
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "crawler_toutiao",
- "function": "crawler_recommend_articles",
- "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
- "status": "fail",
- "trace_id": self.trace_id,
- "data": {"error": str(e), "traceback": traceback.format_exc(),},
- }
- )
- continue
- article_list = response_json["data"]
- for article in article_list:
- if article.get("article_url"):
- video_flag = article.get("has_video")
- if not video_flag:
- try:
- await self.crawler_each_article(
- method="recommend",
- article_raw_data=article,
- category=category,
- )
- except Exception as e:
- print(f"crawler_recommend_articles error: {e}")
- else:
- print("this is an video rather than article")
- continue
- else:
- continue
- async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
- if not category_list:
- category_list = ["finance", "tech", "history", "entertainment"]
- for category in category_list:
- await self.crawler_recommend_articles(category=category)
|