""" @author: luojunhui """ from __future__ import annotations import json import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.db import DatabaseConnector from applications.pipeline import scrape_account_entities_process from applications.utils import Item from applications.utils import insert_into_candidate_account_pool_table from coldStartTasks.crawler.baidu import haokan_search_videos from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler from coldStartTasks.crawler.channels import search_in_wechat_channel from coldStartTasks.crawler.channels import get_channel_account_videos from coldStartTasks.crawler.toutiao import get_associated_recommendation from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list from coldStartTasks.crawler.wechat import get_article_detail from coldStartTasks.crawler.wechat import get_article_list_from_account from coldStartTasks.crawler.wechat import get_source_account_from_article from config import apolloConfig, long_articles_config config = apolloConfig() recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie") blogger_cookie = config.getConfigValue("toutiao_blogger_cookie") class CrawlerAccounts: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_seed_keys(self) -> list[str]: """ get search keys from database """ fetch_query = f""" select association_title from `article_pool_promotion_source` where association_status = 2 order by association_update_timestamp desc limit 100; """ fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) title_list = [] for item in fetch_response: try: title_list += json.loads(item['association_title']) except Exception as e: print(e) continue return list(set(title_list)) def insert_video_into_recommend_table(self, item: dict) -> None: # whether account exists final_item = scrape_account_entities_process(item, self.db_client) if not final_item: return else: # save to db insert_into_candidate_account_pool_table( db_client=self.db_client, account_item=final_item ) def update_account_status( self, account_id_tuple: tuple, ori_status: int, new_status: int ) -> int: update_query = f""" update video_association set status = %s where id in %s and status = %s; """ affected_rows = self.db_client.save( query=update_query, params=(new_status, account_id_tuple, ori_status) ) return affected_rows class ChannelsAccountCrawler(CrawlerAccounts): """ crawler channel accounts strategy: 1. try to get seed titles from database 2. try to get hot_points from web 2. use search api to get accounts """ def process_channels_account( self, account_name: str, account_id: str, title_list_str: str ): """ process video item and save to database """ account_item = Item() account_item.add("account_name", account_name) account_item.add("account_id", account_id) account_item.add("title_list", title_list_str) account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) account_item.add("platform", "sph") # check item account_item.check(source="candidate_account") # save to db self.insert_video_into_recommend_table(account_item.item) def process_search_response(self, video: dict): """ 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频 """ account_name = video["items"][0]["source"]["title"] # search account detail search_account_response = search_in_wechat_channel( search_key=account_name, search_type=2 ) account_detail = search_account_response["data"]["data"][0]["items"][0] account_id = account_detail["jumpInfo"]["userName"] # fetch account video list for the first page search_video_response = get_channel_account_videos(account_id) search_response_data = search_video_response["data"] search_response_data_type = type(search_response_data) if search_response_data_type == dict: video_list = search_response_data["object"] elif search_response_data_type == list: video_list = search_response_data else: raise RuntimeError("search video response type error") title_list = [i["objectDesc"]["description"] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) self.process_channels_account(account_name, account_id, title_list_str) def search_video_in_channels(self, title: str) -> None: """ search """ search_response = search_in_wechat_channel(search_key=title, search_type=1) video_list = search_response["data"]["data"][0]["subBoxes"] for video in tqdm(video_list, desc="crawler each video"): try: self.process_search_response(video) except Exception as e: log( task="crawler_channels_account_videos", function="process_search_response", message="search by title failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc(), }, ) def deal(self): seed_title_list = self.get_seed_keys() for seed_title in tqdm(seed_title_list, desc="crawler each title"): try: self.search_video_in_channels(title=seed_title) except Exception as e: log( task="crawler_channels_account_videos", function="search_video_in_channels", message="search video in channels failed", data={ "title": seed_title, "error": str(e), "traceback": traceback.format_exc(), }, ) class ToutiaoAccountCrawler(CrawlerAccounts): def get_seed_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0 order by score desc limit 100; """ seed_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return seed_video_list def get_level_up_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and ( article_title in ( select distinct(title) from article_pool_promotion_source where status = 1 and deleted = 0 ) or flow_pool_level < 4 ); """ uplevel_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return uplevel_video_list def process_toutiao_account(self, video): # process video item and save to database account_item = Item() user_info = video["user_info"] account_item.add("account_name", user_info["name"]) account_item.add("account_id", user_info["user_id"]) account_item.add("platform", "toutiao") account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) # fetch account video first page video list fetch_response = get_toutiao_account_video_list( account_id=user_info["user_id"], cookie=blogger_cookie ) video_list = fetch_response["data"] title_list = [i["title"] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) account_item.add("title_list", title_list_str) # check item account_item.check(source="candidate_account") # insert into database self.insert_video_into_recommend_table(account_item.item) def get_recommend_video_list(self, seed_video: dict): # get recommend videos for each video seed_video_id = seed_video["url_unique_md5"] recommend_response = get_associated_recommendation( seed_video_id, recommend_cookie ) recommend_video_list = recommend_response["data"] for video in tqdm(recommend_video_list): try: self.process_toutiao_account(video) except Exception as e: log( task="toutiao account crawler", function="process_toutiao_video", message="get recommend video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc(), }, ) def get_category_recommend_list(self): """ 品类推荐流几乎无视频,暂时不做 """ return NotImplementedError() def deal(self): # start # seed_video_list = self.get_seed_videos() seed_video_list = self.get_level_up_videos() for seed_video in tqdm(seed_video_list, desc="get each video recommendation"): try: self.get_recommend_video_list(seed_video) except Exception as e: log( task="toutiao_recommendation_crawler", function="save_each_recommendation", message="save recommendation failed", data={ "error": str(e), "traceback": traceback.format_exc(), "seed_video": seed_video, }, ) class HaoKanAccountCrawler(CrawlerAccounts): def process_haokan_video(self, video: dict) -> None: """ process_haokan_video """ account_item = Item() account_item.add("account_name", video["author"]) account_item.add("account_id", video["author_id"]) account_item.add("platform", "hksp") account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) # fetch account video first page video list fetch_response = baidu_account_video_crawler(account_id=video["author_id"]) video_list = fetch_response["results"] title_list = [i["content"]["title"] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) account_item.add("title_list", title_list_str) # check item account_item.check(source="candidate_account") # insert into database self.insert_video_into_recommend_table(account_item.item) def search_videos_in_haokan_video(self, title: str) -> None: """ search_ """ search_response = haokan_search_videos(title) video_list = search_response["data"]["list"] for video in tqdm(video_list, desc="search videos"): try: self.process_haokan_video(video) except Exception as e: log( task="haokan_search_crawler", function="process_haokan_video", message="process haokan video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc(), }, ) def deal(self): seed_title_list = self.get_seed_keys() for seed_title in tqdm(seed_title_list, desc="crawler each title"): try: self.search_videos_in_haokan_video(seed_title) except Exception as e: log( task="haokan_search_crawler", function="search_videos_in_haokan_video", message="search videos in haokan video failed", data={ "title": seed_title, "error": str(e), "traceback": traceback.format_exc(), }, ) class GzhAccountCrawler(CrawlerAccounts): def get_task_list(self): fetch_query = f""" select id, article_url from publish_single_video_source where source_account = 1 and platform = 'gzh' limit 10; """ task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return task_list def process_official_account(self, account_name, account_id): """ process_official_account """ account_item = Item() account_item.add("account_name", account_name) account_item.add("account_id", account_id) account_item.add("platform", "gzh") account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) # fetch account video first page video list fetch_response = get_article_list_from_account(account_id=account_id, index=None) msg_list = fetch_response["data"]["data"] title_list = [] for msg in msg_list: sub_title_list = [i['Title'] for i in msg['AppMsg']['DetailInfo']] if len(title_list) > 10: continue else: title_list += sub_title_list title_list_str = json.dumps(title_list, ensure_ascii=False) account_item.add("title_list", title_list_str) # check item account_item.check(source="candidate_account") # insert into database self.insert_video_into_recommend_table(account_item.item) def extract_account_from_article_link(self, article_link): """ try to get account info from article link """ # is article link original article_detail = get_article_detail(article_link) is_original = article_detail["data"]["data"]["is_original"] if is_original: return # extract source account source_account = get_source_account_from_article(article_link) if not source_account: return else: account_name = source_account['name'] gh_id = source_account['gh_id'] self.process_official_account(account_name, gh_id) def update_crawler_article_status(self, article_id_tuple: tuple): """ update crawler article status """ update_query = f""" update publish_single_video_source set source_account = %s where id in %s; """ affected_rows = self.db_client.save( query=update_query, params=(0, article_id_tuple) ) return affected_rows def deal(self): task_list = self.get_task_list() task_id_list = [] for crawler_article_obj in tqdm(task_list, desc="crawler article list"): article_url = crawler_article_obj['article_url'] article_id = crawler_article_obj['id'] task_id_list.append(int(article_id)) try: self.extract_account_from_article_link(article_url) except Exception as e: log( task="gzh_account_crawler", function="extract_account_from_article_link", message="extract account from article link failed", data={ "article_url": article_url, "error": str(e), "traceback": traceback.format_exc(), }, ) if task_id_list: article_id_tuple = tuple(task_id_list) affected_rows = self.update_crawler_article_status(article_id_tuple) print(affected_rows)