""" @author: luojunhui """ from __future__ import annotations import json import time import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.db import DatabaseConnector from applications.pipeline import scrape_account_entities_process from applications.utils import Item from applications.utils import insert_into_associated_recommendation_table from coldStartTasks.crawler.toutiao import get_associated_recommendation from config import apolloConfig, long_articles_config config = apolloConfig() cookie = config.getConfigValue("toutiao_detail_recommend_cookie") class CrawlerAccounts: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() class ChannelAccountCrawler(CrawlerAccounts): """ crawler channel accounts strategy: 1. try to get search keys and titles from database 2. try to get hot_points from web 2. use search api to get accounts """ def get_seed_keys(self): """ get search keys from database """ sql = "select * from datastat_sort_strategy limit 100;" result = self.db_client.fetch(sql) return result class ToutiaoAccountCrawler(CrawlerAccounts): def get_seed_videos(self): fetch_query = f""" select out_account_name, article_title, url_unique_md5 from publish_single_video_source where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0 order by score desc limit 100; """ seed_video_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return seed_video_list def process_each_video(self, video, seed_account_name, seed_title): # process video item and save to database video_item = Item() user_info = video["user_info"] video_item.add("account_name", user_info["name"]) video_item.add("account_id", user_info["user_id"]) video_item.add("platform", "toutiao") video_item.add("recommend_video_id", video["id"]) video_item.add("title", video["title"]) video_item.add("read_cnt", video["read_count"]) video_item.add("duration", video["video_duration"]) video_item.add("seed_account", seed_account_name) video_item.add("seed_title", seed_title) video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")) # check item video_item.check(source="association") # whether account exists final_item = scrape_account_entities_process(video_item.item, self.db_client) if not final_item: return else: # save to db insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item) def get_recommend_video_list(self, seed_video: dict): # get recommend videos for each video seed_video_id = seed_video["url_unique_md5"] seed_account_name = seed_video["out_account_name"] seed_title = seed_video["article_title"] recommend_response = get_associated_recommendation(seed_video_id, cookie) recommend_video_list = recommend_response["data"] for video in tqdm(recommend_video_list): self.process_each_video(video, seed_account_name, seed_title) def deal(self): # start seed_video_list = self.get_seed_videos() for seed_video in tqdm(seed_video_list, desc="get each video recommendation"): try: self.get_recommend_video_list(seed_video) except Exception as e: log( task="{}_recommendation_crawler".format(seed_video["platform"]), function="save_each_recommendation", message="save recommendation failed", data={ "error": str(e), "traceback": traceback.format_exc(), "seed_video": seed_video, }, )