""" @author: luojunhui """ from __future__ import annotations import json import time import datetime import traceback import numpy as np from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.api import similarity_between_title_list from applications.db import DatabaseConnector from applications.pipeline import scrape_account_entities_process from applications.utils import Item from applications.utils import insert_into_associated_recommendation_table from coldStartTasks.crawler.toutiao import get_associated_recommendation from coldStartTasks.crawler.channels import search_in_wechat_channel from coldStartTasks.crawler.channels import get_channel_account_videos from config import apolloConfig, long_articles_config config = apolloConfig() cookie = config.getConfigValue("toutiao_detail_recommend_cookie") class CrawlerAccounts: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def insert_video_into_recommend_table(self, item): # whether account exists final_item = scrape_account_entities_process(item, self.db_client) if not final_item: return else: # save to db insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item) def save_similarity_score_to_table( self, association_list:list[dict] ) -> int: """ calculate similarity between seed_title_list and association_title_list """ association_id_list = [i['id'] for i in association_list] association_title_list = [i['title'] for i in association_list] seed_title_list = [i['seed_title'] for i in association_list] similarity_score_list = similarity_between_title_list(seed_title_list, association_title_list) similarity_score_array = np.array(similarity_score_list) # get main diagonal score score_list = np.diag(similarity_score_array) batch_update_query = """ update video_association set score = case id {} end where id in %s and score is null; """ case_statement = [] params = [] for index, score in enumerate(score_list): association_id = association_id_list[index] case_statement.append(f"when %s then %s") params.extend([association_id, score]) params.append(tuple(association_id_list)) case_statements = "\n".join(case_statement) formatted_sql = batch_update_query.format(case_statements) affected_rows = self.db_client.save(formatted_sql, params) return affected_rows def get_video_list_without_score(self): fetch_query = f""" select id, title, seed_title from video_association where score is null; """ fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return fetch_response class ChannelAccountCrawler(CrawlerAccounts): """ crawler channel accounts strategy: 1. try to get seed titles from database 2. try to get hot_points from web 2. use search api to get accounts """ def get_seed_keys(self): """ get search keys from database """ fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;" result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return result def process_each_video(self, video: dict, seed_title: str): """ process video item and save to database """ account_name = video['items'][0]['source']['title'] search_account_response = search_in_wechat_channel(search_key=account_name, search_type=2) account_detail = search_account_response['data']['data'][0]['items'][0] account_id = account_detail['jumpInfo']['userName'] search_video_response = get_channel_account_videos(account_id) video_list = search_video_response['data']['object'] for video in video_list[:5]: video_item = Item() video_item.add("account_name", account_name) video_item.add("account_id", account_id) video_item.add("recommend_video_id", video['id']) video_item.add("title", video['objectDesc']['description']) video_item.add("duration", video['objectDesc']['media'][0]['VideoPlayLen']) video_item.add("seed_account", "SearchWithOutAccount") video_item.add("seed_title", seed_title) video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")) video_item.add("platform", "sph") # check item video_item.check(source="association") # save to db self.insert_video_into_recommend_table(video_item.item) def search_by_title_from_database(self, title: str) -> None: """ search """ search_response = search_in_wechat_channel(search_key=title, search_type=1) print(search_response) video_list = search_response['data']['data'][0]['subBoxes'] for video in tqdm(video_list, desc='crawler each video'): try: self.process_each_video(video, seed_title=title) except Exception as e: print(e) def search_by_title_from_hotpoint(self, title: str) -> None: return def deal(self): seed_title_list = self.get_seed_keys() for item in tqdm(seed_title_list, desc='crawler each title'): try: self.search_by_title_from_database(title=item['title']) except Exception as e: print(e) # cal similarity score video_list = self.get_video_list_without_score() affected_rows = self.save_similarity_score_to_table(video_list) print(affected_rows) class ToutiaoAccountCrawler(CrawlerAccounts): def get_seed_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0 order by score desc limit 100; """ seed_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return seed_video_list def process_each_video(self, video, seed_account_name, seed_title): # process video item and save to database video_item = Item() user_info = video["user_info"] video_item.add("account_name", user_info["name"]) video_item.add("account_id", user_info["user_id"]) video_item.add("platform", "toutiao") video_item.add("recommend_video_id", video["id"]) video_item.add("title", video["title"]) video_item.add("read_cnt", video.get("read_count")) video_item.add("duration", video["video_duration"]) video_item.add("seed_account", seed_account_name) video_item.add("seed_title", seed_title) video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")) # check item video_item.check(source="association") # insert into database self.insert_video_into_recommend_table(video_item.item) def get_recommend_video_list(self, seed_video: dict): # get recommend videos for each video seed_video_id = seed_video["url_unique_md5"] seed_account_name = seed_video["out_account_name"] seed_title = seed_video["article_title"] recommend_response = get_associated_recommendation(seed_video_id, cookie) recommend_video_list = recommend_response["data"] for video in tqdm(recommend_video_list): try: self.process_each_video(video, seed_account_name, seed_title) except Exception as e: print(e) def deal(self): # start seed_video_list = self.get_seed_videos() for seed_video in tqdm(seed_video_list, desc="get each video recommendation"): try: self.get_recommend_video_list(seed_video) except Exception as e: log( task="toutiao_recommendation_crawler", function="save_each_recommendation", message="save recommendation failed", data={ "error": str(e), "traceback": traceback.format_exc(), "seed_video": seed_video, }, ) # cal similarity score video_list = self.get_video_list_without_score() affected_rows = self.save_similarity_score_to_table(video_list) print(affected_rows)