from __future__ import annotations import asyncio import json import time import aiohttp import traceback from datetime import datetime from typing import List, Dict from tqdm import tqdm from applications.api import feishu_robot from applications.crawler.toutiao import get_toutiao_account_info_list from applications.crawler.toutiao import search_in_toutiao from applications.crawler.toutiao import get_toutiao_detail from applications.pipeline import CrawlerPipeline from applications.utils import async_proxy, get_top_article_title_list class CrawlerToutiaoConst: # platform PLATFORM = "toutiao" # account status TOUTIAO_ACCOUNT_GOOD_STATUS = 1 TOUTIAO_ACCOUNT_BAD_STATUS = 0 # earliest cursor, 2021-01-01 00:00:00 DEFAULT_CURSOR = 1609430400 # no source account NO_SOURCE_ACCOUNT_STATUS = 0 # title length min MIN_TITLE_LENGTH = 10 # max video length(second) MAX_VIDEO_LENGTH = 600 # sleep second SLEEP_SECOND = 3 RECOMMEND_TIMES = 10 # 文章模态 ARTICLE_TYPE = 1 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst): def __init__(self, pool, log_client, trace_id): super().__init__(pool, log_client) self.trace_id = trace_id async def get_request_params(self, category): """ get request params """ query = f""" select request_method, request_url, request_headers, post_data from toutiao_request_params where category = %s and expire_flag = %s order by id desc limit 1; """ response = await self.pool.async_fetch(query=query, params=(category, 0)) if not response: now = datetime.now() if 10 < now.hour < 21: await feishu_robot.bot( title="今日头条推荐流,cookie 过期", detail={"info": "cookie expired"}, ) return None else: return response[0] async def get_account_list(self, media_type: str) -> List[dict]: """get toutiao account list""" match media_type: case "video": table = "video_meta_accounts" case "article": table = "article_meta_accounts" case _: return [] # fetch query query = f""" select account_id, max_cursor from {table} where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS}; """ response = await self.pool.async_fetch(query) await self.log_client.log( contents={ "trace_id": self.trace_id, "task": "crawler_toutiao", "function": "get_account_list", "message": f"get toutiao account list, media_type: {media_type}", "status": "success", "data": response, } ) if not response: await feishu_robot.bot( title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常", detail={"platform": self.PLATFORM, "error": "获取账号异常"}, ) return [] else: return response async def crawler_each_account_info_list( self, account_id: str, media_type: str, max_cursor: int | None, max_behot_time: int = 0, ): """ account_id: toutiao account id max_cursor: crawler latest cursor for each account max_behot_time: max behot time from toutiao, use to switch to next page """ has_more = True current_cursor = max_behot_time max_cursor = max_cursor or self.DEFAULT_CURSOR cookie = await self.get_config_value( key="toutiao_blogger_cookie", output_type="string" ) while has_more: response = await get_toutiao_account_info_list( account_id=account_id, cookie=cookie, media_type=media_type, max_behot_time=current_cursor, ) if not response: break if response["message"] != "success": break info_list = response["data"] has_more = response["has_more"] current_cursor = response["next"]["max_behot_time"] if not info_list: break max_timestamp_in_this_group = info_list[0]["publish_time"] if max_timestamp_in_this_group < max_cursor: break # do crawler match media_type: case "video": bar_description = "crawler videos" case "article": bar_description = "crawler articles" case _: raise Exception(f"unknown media type: {media_type}") crawler_info_list_bar = tqdm(info_list, desc=bar_description) for info in crawler_info_list_bar: try: crawler_info_list_bar.set_postfix({"id": info["id"]}) match media_type: case "video": await self.crawler_each_video(info) case "article": await self.crawler_each_article( method="account", article_raw_data=info ) case _: raise Exception(f"unknown media type: {media_type}") except Exception as e: raise Exception(f"crawler each info failed: {e}") if has_more: await asyncio.sleep(self.SLEEP_SECOND) else: break async def crawler_each_account(self, account_name, account_id, media_type, cookie): """ get toutiao account info """ new_account_item = { "account_name": account_name, "account_id": account_id, "platform": self.PLATFORM, "crawler_date": datetime.now().strftime("%Y-%m-%d"), "media_type": media_type, } # get title_list response = await get_toutiao_account_info_list( account_id=account_id, cookie=cookie, media_type="article" ) if not response: return article_raw_data = response["data"] title_list = [i["title"] for i in article_raw_data] new_account_item["title_list"] = json.dumps(title_list, ensure_ascii=False) await self.save_item_to_database( media_type="account", item=new_account_item, trace_id=self.trace_id ) async def crawler_each_article(self, method, article_raw_data, category=None): """ crawler each article """ # 公共字段提取 base_item = { "platform": self.PLATFORM, "mode": method, "out_account_id": article_raw_data["user_info"]["user_id"], "title": article_raw_data["title"], "read_cnt": article_raw_data["read_count"], "like_cnt": article_raw_data["like_count"], "publish_time": article_raw_data["publish_time"], "crawler_time": int(time.time()), } match method: case "account": new_article_item = { **base_item, "category": "toutiao_account_association", "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}", "description": article_raw_data["abstract"], "unique_index": article_raw_data["group_id"], } case "recommend": new_article_item = { **base_item, "category": category, "title": article_raw_data["title"], "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}", "description": article_raw_data["Abstract"], "unique_index": article_raw_data["item_id"], } case _: raise Exception(f"unknown method: {method}") await self.save_item_to_database( media_type="article", item=new_article_item, trace_id=self.trace_id ) async def crawler_each_video(self, video_raw_data): pass async def update_account_max_cursor(self, media_type: str, account_id: str) -> None: """ update account max cursor """ match media_type: case "video": query = f""" select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = %s and platform = %s; """ table = "video_meta_accounts" case "article": query = f""" select max(publish_time) as max_cursor from crawler_meta_article where out_account_id = %s and platform = %s; """ table = "article_meta_accounts" case _: raise Exception(f"unknown media type: {media_type}") response = await self.pool.async_fetch( query, params=(account_id, self.PLATFORM) ) max_publish_timestamp = response[0]["max_cursor"] if max_publish_timestamp: query = f""" update {table} set max_cursor = %s where account_id = %s and platform = %s; """ await self.pool.async_save( query, (max_publish_timestamp, account_id, self.PLATFORM) ) # 获取个人主页文章/视频 async def crawler_task(self, media_type: str) -> None: """ class entrance """ account_list = await self.get_account_list(media_type=media_type) account_list_bar = tqdm(account_list, desc="crawler toutiao accounts") for account in account_list_bar: account_id = account["account_id"] max_cursor = account["max_cursor"] try: account_list_bar.set_postfix({"account_id": account_id}) await self.crawler_each_account_info_list( account_id=account_id, media_type=media_type, max_cursor=max_cursor ) await self.update_account_max_cursor( media_type=media_type, account_id=account_id ) await self.log_client.log( contents={ "trace_id": self.trace_id, "task": "crawler_toutiao_account_info", "function": "crawler_task", "message": f"crawler account: {account_id} successfully, media type: {media_type}", "status": "success", } ) except Exception as e: await self.log_client.log( contents={ "trace_id": self.trace_id, "task": "crawler_toutiao_account_info", "function": "crawler_task", "message": f"crawler_account: {account_id} fail", "status": "fail", "data": { "media_type": media_type, "error": str(e), "traceback": traceback.format_exc(), }, } ) async def crawler_recommend_articles(self, category: str) -> None: cookie = await self.get_request_params(category=category) if not cookie: return for crawler_time in range(self.RECOMMEND_TIMES): try: proxy_url = async_proxy()["url"] proxy_auth = aiohttp.BasicAuth( async_proxy()["username"], async_proxy()["password"] ) async with aiohttp.ClientSession() as session: async with session.request( method=cookie["request_method"], url=cookie["request_url"], headers=json.loads(cookie["request_headers"]), proxy=proxy_url, proxy_auth=proxy_auth, ) as response: response.raise_for_status() response_json = await response.json() await self.log_client.log( contents={ "task": "crawler_toutiao", "function": "crawler_recommend_articles", "message": f"crawler {category} articles, crawler time: {crawler_time + 1}", "trace_id": self.trace_id, "status": "success", "data": response_json, } ) except Exception as e: await self.log_client.log( contents={ "task": "crawler_toutiao", "function": "crawler_recommend_articles", "message": f"crawler {category} articles, crawler time: {crawler_time + 1}", "status": "fail", "trace_id": self.trace_id, "data": { "error": str(e), "traceback": traceback.format_exc(), }, } ) continue if not response_json: continue article_list = response_json["data"] for article in article_list: if article.get("article_url"): video_flag = article.get("has_video") if not video_flag: try: await self.crawler_each_article( method="recommend", article_raw_data=article, category=category, ) except Exception as e: print(f"crawler_recommend_articles error: {e}") else: print("this is an video rather than article") continue else: continue # 抓推荐流 async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None: if not category_list: category_list = ["finance", "tech", "history", "entertainment"] for category in category_list: await self.crawler_recommend_articles(category=category) # 搜索抓账号 async def search_candidate_accounts(self): top_title_list = await get_top_article_title_list(pool=self.pool) cookie = await self.get_config_value( key="toutiao_blogger_cookie", output_type="string" ) for article in top_title_list: title = article["title"] try: search_response = await search_in_toutiao(keyword=title) if not search_response: continue article_list = search_response["data"]["data"] for search_article in article_list: try: article_url = search_article["article_url"] account_name = search_article["source"] if not ( article_url and account_name and "toutiao.com" in article_url ): continue article_detail = await get_toutiao_detail(article_url) if not article_detail: continue account_id = ( article_detail.get("data", {}) .get("data", {}) .get("channel_account_id") ) if account_id: await self.crawler_each_account( account_name, account_id, self.ARTICLE_TYPE, cookie ) await asyncio.sleep(1) except Exception as e: await self.log_client.log( contents={ "task": "crawler_toutiao", "function": "search_candidate_accounts", "trace_id": self.trace_id, "message": "crawler_account fail", "status": "fail", "date": { "error": str(e), "traceback": traceback.format_exc(), "article_info": article, }, } ) await asyncio.sleep(5) except Exception as e: await self.log_client.log( contents={ "task": "crawler_toutiao", "function": "search_candidate_accounts", "trace_id": self.trace_id, "message": "search_in_toutiao failed", "status": "fail", "data": { "error": str(e), "traceback": traceback.format_exc(), }, } )