123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """
- @author: luojunhui
- """
- from __future__ import annotations
- import json
- import time
- import datetime
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_account_entities_process
- from applications.utils import Item
- from applications.utils import insert_into_associated_recommendation_table
- from coldStartTasks.crawler.toutiao import get_associated_recommendation
- from config import apolloConfig, long_articles_config
- config = apolloConfig()
- cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
- class CrawlerAccounts:
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- class ChannelAccountCrawler(CrawlerAccounts):
- """
- crawler channel accounts
- strategy:
- 1. try to get search keys and titles from database
- 2. try to get hot_points from web
- 2. use search api to get accounts
- """
- def get_seed_keys(self):
- """
- get search keys from database
- """
- sql = "select * from datastat_sort_strategy limit 100;"
- result = self.db_client.fetch(sql)
- return result
- class ToutiaoAccountCrawler(CrawlerAccounts):
- def get_seed_videos(self):
- fetch_query = f"""
- select out_account_name, article_title, url_unique_md5
- from publish_single_video_source
- where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
- order by score desc limit 100;
- """
- seed_video_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return seed_video_list
- def process_each_video(self, video, seed_account_name, seed_title):
- # process video item and save to database
- video_item = Item()
- user_info = video["user_info"]
- video_item.add("account_name", user_info["name"])
- video_item.add("account_id", user_info["user_id"])
- video_item.add("platform", "toutiao")
- video_item.add("recommend_video_id", video["id"])
- video_item.add("title", video["title"])
- video_item.add("read_cnt", video["read_count"])
- video_item.add("duration", video["video_duration"])
- video_item.add("seed_account", seed_account_name)
- video_item.add("seed_title", seed_title)
- video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
- # check item
- video_item.check(source="association")
- # whether account exists
- final_item = scrape_account_entities_process(video_item.item, self.db_client)
- if not final_item:
- return
- else:
- # save to db
- insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item)
- def get_recommend_video_list(self, seed_video: dict):
- # get recommend videos for each video
- seed_video_id = seed_video["url_unique_md5"]
- seed_account_name = seed_video["out_account_name"]
- seed_title = seed_video["article_title"]
- recommend_response = get_associated_recommendation(seed_video_id, cookie)
- recommend_video_list = recommend_response["data"]
- for video in tqdm(recommend_video_list):
- self.process_each_video(video, seed_account_name, seed_title)
- def deal(self):
- # start
- seed_video_list = self.get_seed_videos()
- for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
- try:
- self.get_recommend_video_list(seed_video)
- except Exception as e:
- log(
- task="{}_recommendation_crawler".format(seed_video["platform"]),
- function="save_each_recommendation",
- message="save recommendation failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "seed_video": seed_video,
- },
- )
|