123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- """
- @author: luojunhui
- """
- import time
- import traceback
- from typing import List, Set, Dict, Tuple
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import WeixinSpider, longArticlesMySQL, log, bot, aiditApi
- from applications.const import WeixinVideoCrawlerConst
- from applications.functions import Functions
- const = WeixinVideoCrawlerConst()
- function = Functions()
- def get_inner_account_gh_id() -> Set[str]:
- """
- 获取内部账号名称
- :return:
- """
- accounts = aiditApi.get_publish_account_from_aigc()
- gh_id_list = [i['ghId'] for i in accounts]
- return set(gh_id_list)
- class WeixinAccountCrawler(object):
- """
- 账号抓取
- """
- def __init__(self):
- self.db_client = longArticlesMySQL()
- self.spider = WeixinSpider()
- self.crawler_account_count = 0
- def get_crawler_articles(self) -> List[Dict]:
- """
- 获取已经抓取到的文章,判断其是否有链接账号,若有则继续抓账号
- :return:
- """
- sql = f"""
- SELECT id, article_url
- FROM publish_single_video_source
- WHERE source_account = {const.NEED_SCAN_SOURCE_ACCOUNT}
- and bad_status = {const.TITLE_DEFAULT_STATUS}
- and platform = 'gzh' limit 1000;
- """
- article_url_list = self.db_client.select(sql, cursor_type=DictCursor)
- return article_url_list
- def update_crawler_article_status(self, article_id_tuple: Tuple[int, ...]) -> int:
- """
- :param article_id_tuple:
- :return:
- """
- sql = """
- UPDATE publish_single_video_source
- SET source_account = %s
- WHERE id IN %s;
- """
- affected_rows = self.db_client.update(sql, (const.DO_NOT_NEED_SOURCE_ACCOUNT, article_id_tuple))
- return affected_rows
- def get_seed_titles(self, run_date) -> List[str]:
- """
- :return:
- """
- publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
- sql = f"""
- SELECT distinct title
- FROM datastat_sort_strategy
- WHERE read_rate > {const.READ_AVG_MULTIPLE} and view_count > {const.MIN_READ_COUNT} and publish_timestamp > {publish_timestamp_threshold}
- ORDER BY read_rate DESC;
- """
- title_list = self.db_client.select(sql, cursor_type=DictCursor)
- title_list = [i['title'] for i in title_list]
- return title_list
- def is_original(self, article_url: str) -> bool:
- """
- 判断视频是否是原创
- :return:
- """
- response = self.spider.get_article_text(article_url)
- data = response['data']['data']
- return data['is_original']
- def insert_account(self, gh_id: str, account_name: str) -> int:
- """
- 插入账号
- :param account_name:
- :param gh_id:
- :return:
- """
- init_date = time.strftime("%Y-%m-%d", time.localtime())
- sql = """
- INSERT IGNORE INTO weixin_account_for_videos
- (gh_id, account_name, account_init_date)
- VALUES
- (%s, %s, %s);
- """
- insert_rows = self.db_client.update(sql, (gh_id, account_name, init_date))
- return insert_rows
- def process_search_result(self, response: Dict, inner_account_set: Set[str]):
- """
- 处理搜索结果
- :param response:
- :param inner_account_set:
- :return:
- """
- if response['code'] != const.REQUEST_SUCCESS:
- return
- article_list = response['data']['data']
- if article_list:
- for article in article_list:
- try:
- # 先判断账号是否内部账号
- article_url = article['url']
- account_detail = self.spider.get_account_by_url(article_url)
- account_detail = account_detail['data']['data']
- account_name = account_detail['account_name']
- gh_id = account_detail['wx_gh']
- if gh_id in inner_account_set:
- continue
- # 判断搜索结果是否原创
- if self.is_original(article_url):
- continue
- # 判断是否有视频链接
- try:
- video_url = function.get_video_url(article_url)
- except Exception as e:
- continue
- if not video_url:
- continue
- # 将账号抓取进来
- insert_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
- if insert_rows:
- log(
- task="account_crawler_v1",
- function="process_search_result",
- message="insert account success",
- data={
- "gh_id": gh_id,
- "account_name": account_name
- }
- )
- self.crawler_account_count += 1
- except Exception as e:
- log(
- task="account_crawler_v1",
- function="process_search_result",
- message="insert account error",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "data": article
- }
- )
- def search_title_in_weixin(self, title: str, inner_account_set: Set[str]) -> None:
- """
- 调用搜索接口,在微信搜索
- :param inner_account_set:
- :param title:
- :return:
- """
- for page_index in tqdm(range(1, const.MAX_SEARCH_PAGE_NUM + 1), desc='searching: {}'.format(title)):
- try:
- response = self.spider.search_articles(title, page=str(page_index))
- self.process_search_result(response, inner_account_set)
- time.sleep(const.SLEEP_SECONDS)
- except Exception as e:
- log(
- task="account_crawler_v1",
- function="search_title_in_weixin",
- message="search title error",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "title": title
- }
- )
- def run(self, run_date) -> None:
- """
- 入口函数
- :return:
- """
- # get seed titles
- title_list = self.get_seed_titles(run_date)
- # get inner accounts set
- inner_account_gh_id_set = get_inner_account_gh_id()
- start_time = time.time()
- for title in tqdm(title_list, desc="search each title"):
- self.search_title_in_weixin(title, inner_account_gh_id_set)
- # 通知
- bot(
- title="微信账号抓取V1完成",
- detail={
- "总更新账号数量": self.crawler_account_count,
- "总耗时": time.time() - start_time,
- "种子标题数量": len(title_list)
- },
- mention=False
- )
- def run_v2(self) -> None:
- """
- 入口函数
- :return:
- """
- # get article list
- crawler_article_list = self.get_crawler_articles()
- article_id_list = []
- insert_account_count = 0
- for crawler_article_obj in tqdm(crawler_article_list, desc="crawler article list"):
- try:
- article_id = crawler_article_obj['id']
- # 记录处理过的id
- article_id_list.append(int(article_id))
- article_url = crawler_article_obj['article_url']
- # 判断文章是否原创
- if self.is_original(article_url):
- continue
- try:
- source_account_info = function.get_source_account(article_url)
- except Exception as e:
- continue
- if not source_account_info:
- continue
- if source_account_info:
- account_name = source_account_info['name']
- gh_id = source_account_info['gh_id']
- affected_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
- insert_account_count += affected_rows
- else:
- continue
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- if article_id_list:
- article_id_tuple = tuple(article_id_list)
- affected_rows = self.update_crawler_article_status(article_id_tuple)
- bot(
- title="微信账号抓取V2完成",
- detail={
- "扫描文章数量": len(crawler_article_list),
- "新增账号数量": insert_account_count
- },
- mention=False
- )
|