""" @author: luojunhui """ from __future__ import annotations import json import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.db import DatabaseConnector from applications.pipeline import scrape_account_entities_process from applications.utils import Item from applications.utils import insert_into_candidate_account_pool_table from coldStartTasks.crawler.baidu import haokan_search_videos from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler from coldStartTasks.crawler.toutiao import get_associated_recommendation from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list from coldStartTasks.crawler.channels import search_in_wechat_channel from coldStartTasks.crawler.channels import get_channel_account_videos from config import apolloConfig, long_articles_config config = apolloConfig() recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie") blogger_cookie = config.getConfigValue("toutiao_blogger_cookie") class CrawlerAccounts: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_seed_keys(self)->list[dict]: """ get search keys from database """ fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;" result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return result def insert_video_into_recommend_table(self, item: dict) -> None: # whether account exists final_item = scrape_account_entities_process(item, self.db_client) if not final_item: return else: # save to db insert_into_candidate_account_pool_table( db_client=self.db_client, account_item=final_item ) def update_account_status(self, account_id_tuple: tuple, ori_status: int, new_status: int) -> int: update_query = f""" update video_association set status = %s where id in %s and status = %s; """ affected_rows = self.db_client.save(query=update_query, params=(new_status, account_id_tuple, ori_status)) return affected_rows class ChannelsAccountCrawler(CrawlerAccounts): """ crawler channel accounts strategy: 1. try to get seed titles from database 2. try to get hot_points from web 2. use search api to get accounts """ def process_channels_account(self, account_name: str, account_id: str, title_list_str: str): """ process video item and save to database """ account_item = Item() account_item.add("account_name", account_name) account_item.add("account_id", account_id) account_item.add("title_list", title_list_str) account_item.add( "crawler_date", datetime.datetime.today().strftime("%Y-%m-%d") ) account_item.add("platform", "sph") # check item account_item.check(source="candidate_account") # save to db self.insert_video_into_recommend_table(account_item.item) def process_search_response(self, video: dict): """ 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频 """ account_name = video["items"][0]["source"]["title"] # search account detail search_account_response = search_in_wechat_channel( search_key=account_name, search_type=2 ) account_detail = search_account_response["data"]["data"][0]["items"][0] account_id = account_detail["jumpInfo"]["userName"] # fetch account video list for the first page search_video_response = get_channel_account_videos(account_id) video_list = search_video_response["data"]["object"] title_list = [i['objectDesc']['description'] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) self.process_channels_account(account_name, account_id, title_list_str) def search_video_in_channels(self, title: str) -> None: """ search """ search_response = search_in_wechat_channel(search_key=title, search_type=1) video_list = search_response["data"]["data"][0]["subBoxes"] for video in tqdm(video_list, desc="crawler each video"): try: self.process_search_response(video) except Exception as e: log( task="crawler_channels_account_videos", function="process_search_response", message="search by title failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def deal(self): seed_title_list = self.get_seed_keys() for item in tqdm(seed_title_list, desc="crawler each title"): try: self.search_video_in_channels(title=item["title"]) except Exception as e: log( task="crawler_channels_account_videos", function="search_video_in_channels", message="search video in channels failed", data={ "title": item["title"], "error": str(e), "traceback": traceback.format_exc() } ) class ToutiaoAccountCrawler(CrawlerAccounts): def get_seed_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0 order by score desc limit 100; """ seed_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return seed_video_list def process_toutiao_account(self, video): # process video item and save to database account_item = Item() user_info = video["user_info"] account_item.add("account_name", user_info["name"]) account_item.add("account_id", user_info["user_id"]) account_item.add("platform", "toutiao") account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) # fetch account video first page video list fetch_response = get_toutiao_account_video_list(account_id=user_info["user_id"], cookie=blogger_cookie) video_list = fetch_response["data"] title_list = [i["title"] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) account_item.add("title_list", title_list_str) # check item account_item.check(source="candidate_account") # insert into database self.insert_video_into_recommend_table(account_item.item) def get_recommend_video_list(self, seed_video: dict): # get recommend videos for each video seed_video_id = seed_video["url_unique_md5"] recommend_response = get_associated_recommendation(seed_video_id, recommend_cookie) recommend_video_list = recommend_response["data"] for video in tqdm(recommend_video_list): try: self.process_toutiao_account(video) except Exception as e: log( task="toutiao account crawler", function="process_toutiao_video", message="get recommend video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def get_category_recommend_list(self): """ 品类推荐流几乎无视频,暂时不做 """ return NotImplementedError() def deal(self): # start seed_video_list = self.get_seed_videos() for seed_video in tqdm(seed_video_list, desc="get each video recommendation"): try: self.get_recommend_video_list(seed_video) except Exception as e: log( task="toutiao_recommendation_crawler", function="save_each_recommendation", message="save recommendation failed", data={ "error": str(e), "traceback": traceback.format_exc(), "seed_video": seed_video, }, ) class HaoKanAccountCrawler(CrawlerAccounts): def process_haokan_video(self, video: dict) -> None: """ process_haokan_video """ account_item = Item() account_item.add("account_name", video['author']) account_item.add("account_id", video['author_id']) account_item.add("platform", "hksp") account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")) # fetch account video first page video list fetch_response = baidu_account_video_crawler(account_id=video['author_id']) video_list = fetch_response["results"] title_list = [i["content"]["title"] for i in video_list] title_list_str = json.dumps(title_list, ensure_ascii=False) account_item.add("title_list", title_list_str) # check item account_item.check(source="candidate_account") # insert into database self.insert_video_into_recommend_table(account_item.item) def search_videos_in_haokan_video(self, title: str) -> None: """ search_ """ search_response = haokan_search_videos(title) video_list = search_response["data"]["list"] for video in tqdm(video_list, desc="search videos"): try: self.process_haokan_video(video) except Exception as e: log( task="haokan_search_crawler", function="process_haokan_video", message="process haokan video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def deal(self): seed_title_list = self.get_seed_keys() for seed_title in tqdm(seed_title_list, desc="crawler each title"): try: self.search_videos_in_haokan_video(seed_title["title"]) except Exception as e: log( task="haokan_search_crawler", function="search_videos_in_haokan_video", message="search videos in haokan video failed", data={ "title": seed_title["title"], "error": str(e), "traceback": traceback.format_exc() } )