""" @author: luojunhui """ from __future__ import annotations import datetime import traceback import numpy as np from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.api import similarity_between_title_list from applications.db import DatabaseConnector from applications.pipeline import scrape_account_entities_process from applications.utils import Item from applications.utils import insert_into_associated_recommendation_table from coldStartTasks.crawler.baidu import haokan_search_videos from coldStartTasks.crawler.toutiao import get_associated_recommendation from coldStartTasks.crawler.channels import search_in_wechat_channel from coldStartTasks.crawler.channels import get_channel_account_videos from config import apolloConfig, long_articles_config config = apolloConfig() cookie = config.getConfigValue("toutiao_detail_recommend_cookie") class CrawlerAccounts: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_seed_keys(self)->list[dict]: """ get search keys from database """ fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;" result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return result def insert_video_into_recommend_table(self, item: dict) -> None: # whether account exists final_item = scrape_account_entities_process(item, self.db_client) if not final_item: return else: # save to db insert_into_associated_recommendation_table( db_client=self.db_client, associated_recommendation_item=final_item ) def save_similarity_score_to_table(self, association_list: list[dict]) -> int: """ calculate similarity between seed_title_list and association_title_list """ association_id_list = [i["id"] for i in association_list] association_title_list = [i["title"] for i in association_list] seed_title_list = [i["seed_title"] for i in association_list] similarity_score_list = similarity_between_title_list( seed_title_list, association_title_list ) similarity_score_array = np.array(similarity_score_list) # get main diagonal score score_list = np.diag(similarity_score_array) batch_update_query = """ update video_association set score = case id {} end where id in %s and score is null; """ case_statement = [] params = [] for index, score in enumerate(score_list): association_id = association_id_list[index] case_statement.append(f"when %s then %s") params.extend([association_id, score]) params.append(tuple(association_id_list)) case_statements = "\n".join(case_statement) formatted_sql = batch_update_query.format(case_statements) affected_rows = self.db_client.save(formatted_sql, params) return affected_rows def get_video_list_without_score(self): fetch_query = f""" select id, title, seed_title from video_association where score is null; """ fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return fetch_response def get_video_list_with_score(self, platform: str): """ find video from video association """ fetch_query = f""" select id, account_name, recommend_video_id, title, read_cnt, duration, seed_account, seed_title from video_association where score > %s and platform = %s and status = %s order by account_name; """ fetch_response = self.db_client.fetch(query=fetch_query, params=(0.5, platform, 0)) return fetch_response def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int: update_query = f""" update video_association set status = %s where id in %s and status = %s; """ affected_rows = self.db_client.save(query=update_query, params=(new_status, video_id_tuple, ori_status)) return affected_rows class ChannelsAccountCrawler(CrawlerAccounts): """ crawler channel accounts strategy: 1. try to get seed titles from database 2. try to get hot_points from web 2. use search api to get accounts """ def process_channels_video(self, video: dict, seed_title: str, account_name: str, account_id: str): """ process video item and save to database """ video_item = Item() video_item.add("account_name", account_name) video_item.add("account_id", account_id) video_item.add("recommend_video_id", video["id"]) video_item.add("title", video["objectDesc"]["description"]) video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"]) video_item.add("seed_account", "SearchWithOutAccount") video_item.add("seed_title", seed_title) video_item.add( "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d") ) video_item.add("platform", "sph") # check item video_item.check(source="association") # save to db self.insert_video_into_recommend_table(video_item.item) def process_search_response(self, video: dict, seed_title: str): """ 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频 """ account_name = video["items"][0]["source"]["title"] # search account detail search_account_response = search_in_wechat_channel( search_key=account_name, search_type=2 ) account_detail = search_account_response["data"]["data"][0]["items"][0] account_id = account_detail["jumpInfo"]["userName"] # fetch account video list search_video_response = get_channel_account_videos(account_id) video_list = search_video_response["data"]["object"] # process and insert each video for video in video_list: try: self.process_channels_video(video, seed_title, account_name, account_id) except Exception as e: log( task="crawler_channels_account_videos", function="process_channels_video", message="process video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def search_video_in_channels(self, title: str) -> None: """ search """ search_response = search_in_wechat_channel(search_key=title, search_type=1) video_list = search_response["data"]["data"][0]["subBoxes"] for video in tqdm(video_list, desc="crawler each video"): try: self.process_search_response(video, seed_title=title) except Exception as e: log( task="channels account crawler", function="process_search_response", message="search by title failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def deal(self): seed_title_list = self.get_seed_keys() for item in tqdm(seed_title_list, desc="crawler each title"): try: self.search_video_in_channels(title=item["title"]) except Exception as e: log( task="channels account crawler", function="search_video_in_channels", message="search video in channels failed", data={ "title": item["title"], "error": str(e), "traceback": traceback.format_exc() } ) # cal similarity score video_list = self.get_video_list_without_score() affected_rows = self.save_similarity_score_to_table(video_list) print(affected_rows) class ToutiaoAccountCrawler(CrawlerAccounts): def get_seed_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0 order by score desc limit 100; """ seed_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return seed_video_list def process_toutiao_video(self, video, seed_account_name, seed_title): # process video item and save to database video_item = Item() user_info = video["user_info"] video_item.add("account_name", user_info["name"]) video_item.add("account_id", user_info["user_id"]) video_item.add("platform", "toutiao") video_item.add("recommend_video_id", video["id"]) video_item.add("title", video["title"]) video_item.add("read_cnt", video.get("read_count")) video_item.add("duration", video["video_duration"]) video_item.add("seed_account", seed_account_name) video_item.add("seed_title", seed_title) video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")) # check item video_item.check(source="association") # insert into database self.insert_video_into_recommend_table(video_item.item) def get_recommend_video_list(self, seed_video: dict): # get recommend videos for each video seed_video_id = seed_video["url_unique_md5"] seed_account_name = seed_video["out_account_name"] seed_title = seed_video["article_title"] recommend_response = get_associated_recommendation(seed_video_id, cookie) recommend_video_list = recommend_response["data"] for video in tqdm(recommend_video_list): try: self.process_toutiao_video(video, seed_account_name, seed_title) except Exception as e: log( task="toutiao account crawler", function="process_toutiao_video", message="get recommend video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def get_category_recommend_list(self): """ 品类推荐流几乎无视频,暂时不做 """ return NotImplementedError() def deal(self): # start seed_video_list = self.get_seed_videos() for seed_video in tqdm(seed_video_list, desc="get each video recommendation"): try: self.get_recommend_video_list(seed_video) except Exception as e: log( task="toutiao_recommendation_crawler", function="save_each_recommendation", message="save recommendation failed", data={ "error": str(e), "traceback": traceback.format_exc(), "seed_video": seed_video, }, ) # cal similarity score video_list = self.get_video_list_without_score() affected_rows = self.save_similarity_score_to_table(video_list) print(affected_rows) class HaoKanAccountCrawler(CrawlerAccounts): def process_haokan_video(self, video: dict, seed_title: str) -> None: """ process_haokan_video """ video_item = Item() video_item.add("account_name", video['author']) video_item.add("account_id", video['author_id']) video_item.add("platform", "hksp") video_item.add("recommend_video_id", video['vid']) video_item.add("title", video['title']) read_num_string = video['read_num'].replace("次播放", "") if "万" in read_num_string: read_num_string = read_num_string.replace("万", "") read_num = int(float(read_num_string) * 10000) else: read_num = int(read_num_string) video_item.add("read_cnt", int(read_num)) duration_string = video['duration'] duration_list = duration_string.split(":") if len(duration_list) > 2: # video too long return duration = int(duration_list[0]) * 60 + int(duration_list[1]) video_item.add("duration", duration) video_item.add("seed_account", "SearchWithOutAccount") video_item.add("seed_title", seed_title) video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")) # check item video_item.check(source="association") # insert into database self.insert_video_into_recommend_table(video_item.item) def search_videos_in_haokan_video(self, title: str) -> None: """ search_ """ search_response = haokan_search_videos(title) video_list = search_response["data"]["list"] for video in tqdm(video_list, desc="search videos"): try: self.process_haokan_video(video, seed_title=title) except Exception as e: log( task="haokan_search_crawler", function="process_haokan_video", message="process haokan video failed", data={ "video": video, "error": str(e), "traceback": traceback.format_exc() } ) def deal(self): seed_title_list = self.get_seed_keys() for seed_title in tqdm(seed_title_list, desc="crawler each title"): try: self.search_videos_in_haokan_video(seed_title["title"]) except Exception as e: log( task="haokan_search_crawler", function="search_videos_in_haokan_video", message="search videos in haokan video failed", data={ "title": seed_title["title"], "error": str(e), "traceback": traceback.format_exc() } ) video_list = self.get_video_list_without_score() affected_rows = self.save_similarity_score_to_table(video_list) print(affected_rows)