123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- """
- @author: luojunhui
- 微信账号联想
- """
- import datetime
- import json
- import traceback
- from typing import List, Set, Dict
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import aiditApi
- from applications import bot
- from applications import log
- from applications import longArticlesMySQL
- from applications import WeixinSpider
- from applications.const import AccountAssociationTaskConst
- from applications.functions import Functions
- from config import apolloConfig
- const = AccountAssociationTaskConst()
- function = Functions()
- config = apolloConfig()
- empty_dict = {}
- def get_inner_account_gh_id() -> Set[str]:
- """
- 获取内部账号名称
- :return:
- """
- accounts = aiditApi.get_publish_account_from_aigc()
- gh_id_list = [i['ghId'] for i in accounts]
- return set(gh_id_list)
- class AccountAssociationCrawler(object):
- """
- 账号抓取
- """
- def __init__(self):
- self.db_client = longArticlesMySQL()
- self.spider = WeixinSpider()
- self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
- self.crawler_account_count = 0
- self.total_crawler_count = 0
- self.inner_account_count = 0
- self.account_name_filter_count = 0
- self.already_crawler_account_count = 0
- self.official_accounts = 0
- def is_bad_account(self, account_name: str) -> bool:
- """
- 判断账号是否为bad account
- :param account_name:
- :return:
- """
- if account_name == "":
- return True
- for key in self.account_name_filter:
- if key in account_name:
- return True
- return False
- def is_account_official(self, gh_id: str) -> bool:
- """
- 判断账号是否为官方账号
- :param gh_id:
- :return: True or False
- """
- response = self.spider.update_msg_list(ghId=gh_id, index=None)
- article_list = response['data']['data']
- published_articles_send_date = []
- for item in article_list:
- if item.get("AppMsg", empty_dict).get("BaseInfo", empty_dict).get("Type") == 9:
- # 获取群发头条的send_time
- send_time = item['AppMsg']['DetailInfo'][0]['send_time']
- send_date = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d')
- published_articles_send_date.append(send_date)
- published_articles_send_date_set = set(published_articles_send_date)
- if len(published_articles_send_date_set) == len(published_articles_send_date):
- return False
- else:
- return True
-
- def get_seed_titles(self, run_date: datetime) -> List[Dict]:
- """
- :return:
- """
- publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
- sql = f"""
- SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
- FROM datastat_sort_strategy t1
- JOIN long_articles_text t2
- ON t1.source_id = t2.content_id
- WHERE t1.read_rate > {const.READ_AVG_MULTIPLE}
- AND t1.view_count > {const.MIN_READ_COUNT}
- AND publish_timestamp > {publish_timestamp_threshold}
- ORDER BY read_rate DESC
- LIMIT {const.SEED_TITLE_LIMIT};
- """
- article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
- return article_obj_list
- def search_account_in_weixin(self, article_obj: Dict) -> Dict:
- """
- 通过文章信息使用搜索接口搜索账号
- :param article_obj:
- :return:
- """
- ori_title = article_obj['title']
- summary = article_obj['kimi_summary']
- kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
- response_1 = self.spider.search_articles(title=ori_title)
- response_2 = self.spider.search_articles(title=summary) if summary else {}
- response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
- response = {
- "title": response_1,
- "summary": response_2,
- "kimi_keys": response_3
- }
- return response
- def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
- """
- :param biz_date:
- :param category:
- :param account_name:
- :param gh_id:
- :return:
- """
- insert_sql = f"""
- INSERT INTO long_articles_accounts
- (gh_id, account_source, account_name, account_category, init_date)
- values
- (%s, %s, %s, %s, %s)
- """
- affected_rows = self.db_client.update(
- sql=insert_sql,
- params=(gh_id, "weixin", account_name, category, biz_date)
- )
- return affected_rows
- def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
- """
- 保存账号信息
- :param biz_date:
- :param search_response:
- :param inner_account_gh_id_set:
- :return:
- """
- for key in search_response:
- value = search_response[key]
- if value:
- search_article_list = value['data']['data']
- for article in tqdm(search_article_list):
- article_url = article['url']
- try:
- account_info = self.spider.get_account_by_url(article_url)
- self.total_crawler_count += 1
- account_name = account_info['data']['data']['account_name']
- gh_id = account_info['data']['data']['wx_gh']
- # 过滤内部账号
- if gh_id in inner_account_gh_id_set:
- self.inner_account_count += 1
- continue
- # 通过账号名称过滤一些bad_account or dangerous account
- if self.is_bad_account(account_name):
- self.account_name_filter_count += 1
- continue
- # 判断账号是否为官方账号
- if self.is_account_official(gh_id):
- self.official_accounts += 1
- continue
- try:
- self.insert_account_into_database(
- account_name=account_name,
- gh_id=gh_id,
- category="account_association",
- biz_date=biz_date
- )
- except Exception as e:
- self.already_crawler_account_count += 1
- print(e)
- continue
- self.crawler_account_count += 1
- except Exception as e:
- log(
- task="account_association",
- function="save_account_into_db",
- data={
- "biz_date": biz_date,
- "article": article,
- "trace_back": traceback.format_exc(),
- "error": f"{e}"
- }
- )
- continue
- else:
- continue
- def run_account_association(self, biz_date: datetime):
- """
- 执行账号联想
- :param biz_date:
- :return:
- """
- inner_account_gh_id_set = get_inner_account_gh_id()
- seed_articles = self.get_seed_titles(biz_date)
- for article in tqdm(seed_articles):
- try:
- # search from weixin
- search_response = self.search_account_in_weixin(article)
- # save
- self.save_account_into_db(
- search_response=search_response,
- inner_account_gh_id_set=inner_account_gh_id_set,
- biz_date=biz_date.strftime("%Y-%m-%d")
- )
- except Exception as e:
- log(
- task="account_association",
- function="run_account_association",
- data={
- "biz_date": biz_date.strftime("%Y-%m-%d"),
- "article": article,
- "trace_back": traceback.format_exc(),
- "error": f"{e}"
- }
- )
- bot(
- title="账号联想-账号抓取完成",
- detail={
- "总共联想到的账号数": self.total_crawler_count,
- "内部账号过滤": self.inner_account_count,
- "账号名称过滤": self.account_name_filter_count,
- "官方账号过滤": self.official_accounts,
- "已经抓取账号": self.already_crawler_account_count,
- "新增账号": self.crawler_account_count
- }
- )
|