123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- """
- @author: luojunhui
- """
- from __future__ import annotations
- import json
- import datetime
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_account_entities_process
- from applications.utils import Item
- from applications.utils import insert_into_candidate_account_pool_table
- from coldStartTasks.crawler.baidu import haokan_search_videos
- from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
- from coldStartTasks.crawler.channels import search_in_wechat_channel
- from coldStartTasks.crawler.channels import get_channel_account_videos
- from coldStartTasks.crawler.toutiao import get_associated_recommendation
- from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
- from coldStartTasks.crawler.wechat import get_article_detail
- from coldStartTasks.crawler.wechat import get_article_list_from_account
- from coldStartTasks.crawler.wechat import get_source_account_from_article
- from config import apolloConfig, long_articles_config
- config = apolloConfig()
- recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
- blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
- class CrawlerAccounts:
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_seed_keys(self) -> list[str]:
- """
- get search keys from database
- """
- fetch_query = f"""
- select association_title
- from `article_pool_promotion_source`
- where association_status = 2 order by association_update_timestamp desc limit 100;
- """
- fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- title_list = []
- for item in fetch_response:
- try:
- title_list += json.loads(item['association_title'])
- except Exception as e:
- print(e)
- continue
- return list(set(title_list))
- def insert_video_into_recommend_table(self, item: dict) -> None:
- # whether account exists
- final_item = scrape_account_entities_process(item, self.db_client)
- if not final_item:
- return
- else:
- # save to db
- insert_into_candidate_account_pool_table(
- db_client=self.db_client, account_item=final_item
- )
- def update_account_status(
- self, account_id_tuple: tuple, ori_status: int, new_status: int
- ) -> int:
- update_query = f"""
- update video_association
- set status = %s
- where id in %s and status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_query, params=(new_status, account_id_tuple, ori_status)
- )
- return affected_rows
- class ChannelsAccountCrawler(CrawlerAccounts):
- """
- crawler channel accounts
- strategy:
- 1. try to get seed titles from database
- 2. try to get hot_points from web
- 2. use search api to get accounts
- """
- def process_channels_account(
- self, account_name: str, account_id: str, title_list_str: str
- ):
- """
- process video item and save to database
- """
- account_item = Item()
- account_item.add("account_name", account_name)
- account_item.add("account_id", account_id)
- account_item.add("title_list", title_list_str)
- account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
- account_item.add("platform", "sph")
- # check item
- account_item.check(source="candidate_account")
- # save to db
- self.insert_video_into_recommend_table(account_item.item)
- def process_search_response(self, video: dict):
- """
- 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
- """
- account_name = video["items"][0]["source"]["title"]
- # search account detail
- search_account_response = search_in_wechat_channel(
- search_key=account_name, search_type=2
- )
- account_detail = search_account_response["data"]["data"][0]["items"][0]
- account_id = account_detail["jumpInfo"]["userName"]
- # fetch account video list for the first page
- search_video_response = get_channel_account_videos(account_id)
- search_response_data = search_video_response["data"]
- search_response_data_type = type(search_response_data)
- if search_response_data_type == dict:
- video_list = search_response_data["object"]
- elif search_response_data_type == list:
- video_list = search_response_data
- else:
- raise RuntimeError("search video response type error")
- title_list = [i["objectDesc"]["description"] for i in video_list]
- title_list_str = json.dumps(title_list, ensure_ascii=False)
- self.process_channels_account(account_name, account_id, title_list_str)
- def search_video_in_channels(self, title: str) -> None:
- """
- search
- """
- search_response = search_in_wechat_channel(search_key=title, search_type=1)
- video_list = search_response["data"]["data"][0]["subBoxes"]
- for video in tqdm(video_list, desc="crawler each video"):
- try:
- self.process_search_response(video)
- except Exception as e:
- log(
- task="crawler_channels_account_videos",
- function="process_search_response",
- message="search by title failed",
- data={
- "video": video,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- def deal(self):
- seed_title_list = self.get_seed_keys()
- for seed_title in tqdm(seed_title_list, desc="crawler each title"):
- try:
- self.search_video_in_channels(title=seed_title)
- except Exception as e:
- log(
- task="crawler_channels_account_videos",
- function="search_video_in_channels",
- message="search video in channels failed",
- data={
- "title": seed_title,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- class ToutiaoAccountCrawler(CrawlerAccounts):
- def get_seed_videos(self):
- fetch_query = f"""
- select out_account_name, article_title, url_unique_md5
- from publish_single_video_source
- where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
- order by score desc limit 100;
- """
- seed_video_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return seed_video_list
- def get_level_up_videos(self):
- fetch_query = f"""
- select out_account_name, article_title, url_unique_md5
- from publish_single_video_source
- where platform = 'toutiao' and (
- article_title in (
- select distinct(title) from article_pool_promotion_source where status = 1 and deleted = 0
- )
- or flow_pool_level < 4
- );
- """
- uplevel_video_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return uplevel_video_list
- def process_toutiao_account(self, video):
- # process video item and save to database
- account_item = Item()
- user_info = video["user_info"]
- account_item.add("account_name", user_info["name"])
- account_item.add("account_id", user_info["user_id"])
- account_item.add("platform", "toutiao")
- account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
- # fetch account video first page video list
- fetch_response = get_toutiao_account_video_list(
- account_id=user_info["user_id"], cookie=blogger_cookie
- )
- video_list = fetch_response["data"]
- title_list = [i["title"] for i in video_list]
- title_list_str = json.dumps(title_list, ensure_ascii=False)
- account_item.add("title_list", title_list_str)
- # check item
- account_item.check(source="candidate_account")
- # insert into database
- self.insert_video_into_recommend_table(account_item.item)
- def get_recommend_video_list(self, seed_video: dict):
- # get recommend videos for each video
- seed_video_id = seed_video["url_unique_md5"]
- recommend_response = get_associated_recommendation(
- seed_video_id, recommend_cookie
- )
- recommend_video_list = recommend_response["data"]
- for video in tqdm(recommend_video_list):
- try:
- self.process_toutiao_account(video)
- except Exception as e:
- log(
- task="toutiao account crawler",
- function="process_toutiao_video",
- message="get recommend video failed",
- data={
- "video": video,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- def get_category_recommend_list(self):
- """
- 品类推荐流几乎无视频,暂时不做
- """
- return NotImplementedError()
- def deal(self):
- # start
- # seed_video_list = self.get_seed_videos()
- seed_video_list = self.get_level_up_videos()
- for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
- try:
- self.get_recommend_video_list(seed_video)
- except Exception as e:
- log(
- task="toutiao_recommendation_crawler",
- function="save_each_recommendation",
- message="save recommendation failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "seed_video": seed_video,
- },
- )
- class HaoKanAccountCrawler(CrawlerAccounts):
- def process_haokan_video(self, video: dict) -> None:
- """
- process_haokan_video
- """
- account_item = Item()
- account_item.add("account_name", video["author"])
- account_item.add("account_id", video["author_id"])
- account_item.add("platform", "hksp")
- account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
- # fetch account video first page video list
- fetch_response = baidu_account_video_crawler(account_id=video["author_id"])
- video_list = fetch_response["results"]
- title_list = [i["content"]["title"] for i in video_list]
- title_list_str = json.dumps(title_list, ensure_ascii=False)
- account_item.add("title_list", title_list_str)
- # check item
- account_item.check(source="candidate_account")
- # insert into database
- self.insert_video_into_recommend_table(account_item.item)
- def search_videos_in_haokan_video(self, title: str) -> None:
- """
- search_
- """
- search_response = haokan_search_videos(title)
- video_list = search_response["data"]["list"]
- for video in tqdm(video_list, desc="search videos"):
- try:
- self.process_haokan_video(video)
- except Exception as e:
- log(
- task="haokan_search_crawler",
- function="process_haokan_video",
- message="process haokan video failed",
- data={
- "video": video,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- def deal(self):
- seed_title_list = self.get_seed_keys()
- for seed_title in tqdm(seed_title_list, desc="crawler each title"):
- try:
- self.search_videos_in_haokan_video(seed_title)
- except Exception as e:
- log(
- task="haokan_search_crawler",
- function="search_videos_in_haokan_video",
- message="search videos in haokan video failed",
- data={
- "title": seed_title,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- class GzhAccountCrawler(CrawlerAccounts):
- def get_task_list(self):
- fetch_query = f"""
- select id, article_url
- from publish_single_video_source
- where source_account = 1 and platform = 'gzh' limit 10;
- """
- task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- return task_list
- def process_official_account(self, account_name, account_id):
- """
- process_official_account
- """
- account_item = Item()
- account_item.add("account_name", account_name)
- account_item.add("account_id", account_id)
- account_item.add("platform", "gzh")
- account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
- # fetch account video first page video list
- fetch_response = get_article_list_from_account(account_id=account_id, index=None)
- msg_list = fetch_response["data"]["data"]
- title_list = []
- for msg in msg_list:
- sub_title_list = [i['Title'] for i in msg['AppMsg']['DetailInfo']]
- if len(title_list) > 10:
- continue
- else:
- title_list += sub_title_list
- title_list_str = json.dumps(title_list, ensure_ascii=False)
- account_item.add("title_list", title_list_str)
- # check item
- account_item.check(source="candidate_account")
- # insert into database
- self.insert_video_into_recommend_table(account_item.item)
- def extract_account_from_article_link(self, article_link):
- """
- try to get account info from article link
- """
- # is article link original
- article_detail = get_article_detail(article_link)
- is_original = article_detail["data"]["data"]["is_original"]
- if is_original:
- return
- # extract source account
- source_account = get_source_account_from_article(article_link)
- if not source_account:
- return
- else:
- account_name = source_account['name']
- gh_id = source_account['gh_id']
- self.process_official_account(account_name, gh_id)
- def update_crawler_article_status(self, article_id_tuple: tuple):
- """
- update crawler article status
- """
- update_query = f"""
- update publish_single_video_source
- set source_account = %s
- where id in %s;
- """
- affected_rows = self.db_client.save(
- query=update_query, params=(0, article_id_tuple)
- )
- return affected_rows
- def deal(self):
- task_list = self.get_task_list()
- task_id_list = []
- for crawler_article_obj in tqdm(task_list, desc="crawler article list"):
- article_url = crawler_article_obj['article_url']
- article_id = crawler_article_obj['id']
- task_id_list.append(int(article_id))
- try:
- self.extract_account_from_article_link(article_url)
- except Exception as e:
- log(
- task="gzh_account_crawler",
- function="extract_account_from_article_link",
- message="extract account from article link failed",
- data={
- "article_url": article_url,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- if task_id_list:
- article_id_tuple = tuple(task_id_list)
- affected_rows = self.update_crawler_article_status(article_id_tuple)
- print(affected_rows)
|