""" @author: luojunhui 微信账号联想 """ import datetime import json import traceback from typing import List, Set, Dict from tqdm import tqdm from pymysql.cursors import DictCursor from applications import aiditApi from applications import bot from applications import log from applications import longArticlesMySQL from applications import WeixinSpider from applications.const import AccountAssociationTaskConst from applications.functions import Functions from config import apolloConfig const = AccountAssociationTaskConst() function = Functions() config = apolloConfig() empty_dict = {} def get_inner_account_gh_id() -> Set[str]: """ 获取内部账号名称 :return: """ accounts = aiditApi.get_publish_account_from_aigc() gh_id_list = [i['ghId'] for i in accounts] return set(gh_id_list) class AccountAssociationCrawler(object): """ 账号抓取 """ def __init__(self): self.db_client = longArticlesMySQL() self.spider = WeixinSpider() self.account_name_filter = json.loads(config.getConfigValue('account_name_filter')) self.crawler_account_count = 0 self.total_crawler_count = 0 self.inner_account_count = 0 self.account_name_filter_count = 0 self.already_crawler_account_count = 0 self.official_accounts = 0 def is_bad_account(self, account_name: str) -> bool: """ 判断账号是否为bad account :param account_name: :return: """ if account_name == "": return True for key in self.account_name_filter: if key in account_name: return True return False def is_account_official(self, gh_id: str) -> bool: """ 判断账号是否为官方账号 :param gh_id: :return: True or False """ response = self.spider.update_msg_list(ghId=gh_id, index=None) article_list = response['data']['data'] published_articles_send_date = [] for item in article_list: if item.get("AppMsg", empty_dict).get("BaseInfo", empty_dict).get("Type") == 9: # 获取群发头条的send_time send_time = item['AppMsg']['DetailInfo'][0]['send_time'] send_date = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d') published_articles_send_date.append(send_date) published_articles_send_date_set = set(published_articles_send_date) if len(published_articles_send_date_set) == len(published_articles_send_date): return False else: return True def get_seed_titles(self, run_date: datetime) -> List[Dict]: """ :return: """ publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD sql = f""" SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys FROM datastat_sort_strategy t1 JOIN long_articles_text t2 ON t1.source_id = t2.content_id WHERE t1.read_rate > {const.READ_AVG_MULTIPLE} AND t1.view_count > {const.MIN_READ_COUNT} AND publish_timestamp > {publish_timestamp_threshold} ORDER BY read_rate DESC LIMIT {const.SEED_TITLE_LIMIT}; """ article_obj_list = self.db_client.select(sql, cursor_type=DictCursor) return article_obj_list def search_account_in_weixin(self, article_obj: Dict) -> Dict: """ 通过文章信息使用搜索接口搜索账号 :param article_obj: :return: """ ori_title = article_obj['title'] summary = article_obj['kimi_summary'] kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None response_1 = self.spider.search_articles(title=ori_title) response_2 = self.spider.search_articles(title=summary) if summary else {} response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {} response = { "title": response_1, "summary": response_2, "kimi_keys": response_3 } return response def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int: """ :param biz_date: :param category: :param account_name: :param gh_id: :return: """ insert_sql = f""" INSERT INTO long_articles_accounts (gh_id, account_source, account_name, account_category, init_date) values (%s, %s, %s, %s, %s) """ affected_rows = self.db_client.update( sql=insert_sql, params=(gh_id, "weixin", account_name, category, biz_date) ) return affected_rows def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None: """ 保存账号信息 :param biz_date: :param search_response: :param inner_account_gh_id_set: :return: """ for key in search_response: value = search_response[key] if value: search_article_list = value['data']['data'] for article in tqdm(search_article_list): article_url = article['url'] try: account_info = self.spider.get_account_by_url(article_url) self.total_crawler_count += 1 account_name = account_info['data']['data']['account_name'] gh_id = account_info['data']['data']['wx_gh'] # 过滤内部账号 if gh_id in inner_account_gh_id_set: self.inner_account_count += 1 continue # 通过账号名称过滤一些bad_account or dangerous account if self.is_bad_account(account_name): self.account_name_filter_count += 1 continue # 判断账号是否为官方账号 if self.is_account_official(gh_id): self.official_accounts += 1 continue try: self.insert_account_into_database( account_name=account_name, gh_id=gh_id, category="account_association", biz_date=biz_date ) except Exception as e: self.already_crawler_account_count += 1 print(e) continue self.crawler_account_count += 1 except Exception as e: log( task="account_association", function="save_account_into_db", data={ "biz_date": biz_date, "article": article, "trace_back": traceback.format_exc(), "error": f"{e}" } ) continue else: continue def run_account_association(self, biz_date: datetime): """ 执行账号联想 :param biz_date: :return: """ inner_account_gh_id_set = get_inner_account_gh_id() seed_articles = self.get_seed_titles(biz_date) for article in tqdm(seed_articles): try: # search from weixin search_response = self.search_account_in_weixin(article) # save self.save_account_into_db( search_response=search_response, inner_account_gh_id_set=inner_account_gh_id_set, biz_date=biz_date.strftime("%Y-%m-%d") ) except Exception as e: log( task="account_association", function="run_account_association", data={ "biz_date": biz_date.strftime("%Y-%m-%d"), "article": article, "trace_back": traceback.format_exc(), "error": f"{e}" } ) bot( title="账号联想-账号抓取完成", detail={ "总共联想到的账号数": self.total_crawler_count, "内部账号过滤": self.inner_account_count, "账号名称过滤": self.account_name_filter_count, "官方账号过滤": self.official_accounts, "已经抓取账号": self.already_crawler_account_count, "新增账号": self.crawler_account_count } )