|| 
							- """
 
- @author: luojunhui
 
- 微信账号联想
 
- """
 
- import datetime
 
- import json
 
- import traceback
 
- from typing import List, Set, Dict
 
- from tqdm import tqdm
 
- from pymysql.cursors import DictCursor
 
- from applications import aiditApi
 
- from applications import bot
 
- from applications import log
 
- from applications import longArticlesMySQL
 
- from applications import WeixinSpider
 
- from applications.const import AccountAssociationTaskConst
 
- from applications.functions import Functions
 
- from config import apolloConfig
 
- const = AccountAssociationTaskConst()
 
- function = Functions()
 
- config = apolloConfig()
 
- empty_dict = {}
 
- def get_inner_account_gh_id() -> Set[str]:
 
-     """
 
-     获取内部账号名称
 
-     :return:
 
-     """
 
-     accounts = aiditApi.get_publish_account_from_aigc()
 
-     gh_id_list = [i['ghId'] for i in accounts]
 
-     return set(gh_id_list)
 
- class AccountAssociationCrawler(object):
 
-     """
 
-     账号抓取
 
-     """
 
-     def __init__(self):
 
-         self.db_client = longArticlesMySQL()
 
-         self.spider = WeixinSpider()
 
-         self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
 
-         self.crawler_account_count = 0
 
-         self.total_crawler_count = 0
 
-         self.inner_account_count = 0
 
-         self.account_name_filter_count = 0
 
-         self.already_crawler_account_count = 0
 
-         self.official_accounts = 0
 
-     def is_bad_account(self, account_name: str) -> bool:
 
-         """
 
-         判断账号是否为bad account
 
-         :param account_name:
 
-         :return:
 
-         """
 
-         if account_name == "":
 
-             return True
 
-         for key in self.account_name_filter:
 
-             if key in account_name:
 
-                 return True
 
-         return False
 
-     def is_account_official(self, gh_id: str) -> bool:
 
-         """
 
-         判断账号是否为官方账号
 
-         :param gh_id:
 
-         :return: True or False
 
-         """
 
-         response = self.spider.update_msg_list(ghId=gh_id, index=None)
 
-         article_list = response['data']['data']
 
-         published_articles_send_date = []
 
-         for item in article_list:
 
-             if item.get("AppMsg", empty_dict).get("BaseInfo", empty_dict).get("Type") == 9:
 
-                 # 获取群发头条的send_time
 
-                 send_time = item['AppMsg']['DetailInfo'][0]['send_time']
 
-                 send_date = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d')
 
-                 published_articles_send_date.append(send_date)
 
-         published_articles_send_date_set = set(published_articles_send_date)
 
-         if len(published_articles_send_date_set) == len(published_articles_send_date):
 
-             return False
 
-         else:
 
-             return True
 
-  
 
-     def get_seed_titles(self, run_date: datetime) -> List[Dict]:
 
-         """
 
-         :return:
 
-         """
 
-         publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
 
-         sql = f"""
 
-             SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
 
-             FROM datastat_sort_strategy t1
 
-             JOIN long_articles_text t2
 
-             ON t1.source_id = t2.content_id
 
-             WHERE t1.read_rate > {const.READ_AVG_MULTIPLE} 
 
-                 AND t1.view_count > {const.MIN_READ_COUNT} 
 
-                 AND publish_timestamp > {publish_timestamp_threshold}
 
-             ORDER BY read_rate DESC
 
-             LIMIT {const.SEED_TITLE_LIMIT};
 
-         """
 
-         article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
 
-         return article_obj_list
 
-     def search_account_in_weixin(self, article_obj: Dict) -> Dict:
 
-         """
 
-         通过文章信息使用搜索接口搜索账号
 
-         :param article_obj:
 
-         :return:
 
-         """
 
-         ori_title = article_obj['title']
 
-         summary = article_obj['kimi_summary']
 
-         kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
 
-         response_1 = self.spider.search_articles(title=ori_title)
 
-         response_2 = self.spider.search_articles(title=summary) if summary else {}
 
-         response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
 
-         response = {
 
-             "title": response_1,
 
-             "summary": response_2,
 
-             "kimi_keys": response_3
 
-         }
 
-         return response
 
-     def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
 
-         """
 
-         :param biz_date:
 
-         :param category:
 
-         :param account_name:
 
-         :param gh_id:
 
-         :return:
 
-         """
 
-         insert_sql = f"""
 
-             INSERT INTO long_articles_accounts
 
-             (gh_id, account_source, account_name, account_category, init_date)
 
-             values 
 
-             (%s, %s, %s, %s, %s)
 
-         """
 
-         affected_rows = self.db_client.update(
 
-             sql=insert_sql,
 
-             params=(gh_id, "weixin", account_name, category, biz_date)
 
-         )
 
-         return affected_rows
 
-     def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
 
-         """
 
-         保存账号信息
 
-         :param biz_date:
 
-         :param search_response:
 
-         :param inner_account_gh_id_set:
 
-         :return:
 
-         """
 
-         for key in search_response:
 
-             value = search_response[key]
 
-             if value:
 
-                 search_article_list = value['data']['data']
 
-                 for article in tqdm(search_article_list):
 
-                     article_url = article['url']
 
-                     try:
 
-                         account_info = self.spider.get_account_by_url(article_url)
 
-                         self.total_crawler_count += 1
 
-                         account_name = account_info['data']['data']['account_name']
 
-                         gh_id = account_info['data']['data']['wx_gh']
 
-                         # 过滤内部账号
 
-                         if gh_id in inner_account_gh_id_set:
 
-                             self.inner_account_count += 1
 
-                             continue
 
-                         # 通过账号名称过滤一些bad_account or dangerous account
 
-                         if self.is_bad_account(account_name):
 
-                             self.account_name_filter_count += 1
 
-                             continue
 
-                         # 判断账号是否为官方账号
 
-                         if self.is_account_official(gh_id):
 
-                             self.official_accounts += 1
 
-                             continue
 
-                         try:
 
-                             self.insert_account_into_database(
 
-                                 account_name=account_name,
 
-                                 gh_id=gh_id,
 
-                                 category="account_association",
 
-                                 biz_date=biz_date
 
-                             )
 
-                         except Exception as e:
 
-                             self.already_crawler_account_count += 1
 
-                             print(e)
 
-                             continue
 
-                         self.crawler_account_count += 1
 
-                     except Exception as e:
 
-                         log(
 
-                             task="account_association",
 
-                             function="save_account_into_db",
 
-                             data={
 
-                                 "biz_date": biz_date,
 
-                                 "article": article,
 
-                                 "trace_back": traceback.format_exc(),
 
-                                 "error": f"{e}"
 
-                             }
 
-                         )
 
-                         continue
 
-             else:
 
-                 continue
 
-     def run_account_association(self, biz_date: datetime):
 
-         """
 
-         执行账号联想
 
-         :param biz_date:
 
-         :return:
 
-         """
 
-         inner_account_gh_id_set = get_inner_account_gh_id()
 
-         seed_articles = self.get_seed_titles(biz_date)
 
-         for article in tqdm(seed_articles):
 
-             try:
 
-                 # search from weixin
 
-                 search_response = self.search_account_in_weixin(article)
 
-                 # save
 
-                 self.save_account_into_db(
 
-                     search_response=search_response,
 
-                     inner_account_gh_id_set=inner_account_gh_id_set,
 
-                     biz_date=biz_date.strftime("%Y-%m-%d")
 
-                 )
 
-             except Exception as e:
 
-                 log(
 
-                     task="account_association",
 
-                     function="run_account_association",
 
-                     data={
 
-                         "biz_date": biz_date.strftime("%Y-%m-%d"),
 
-                         "article": article,
 
-                         "trace_back": traceback.format_exc(),
 
-                         "error": f"{e}"
 
-                     }
 
-                 )
 
-         bot(
 
-             title="账号联想-账号抓取完成",
 
-             detail={
 
-                 "总共联想到的账号数": self.total_crawler_count,
 
-                 "内部账号过滤": self.inner_account_count,
 
-                 "账号名称过滤": self.account_name_filter_count,
 
-                 "官方账号过滤": self.official_accounts,
 
-                 "已经抓取账号": self.already_crawler_account_count,
 
-                 "新增账号": self.crawler_account_count
 
-             }
 
-         )
 
 
  |