weixin_account_association_crawler.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. @author: luojunhui
  3. 微信账号联想
  4. """
  5. import datetime
  6. import json
  7. import traceback
  8. from typing import List, Set, Dict
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import aiditApi
  12. from applications import bot
  13. from applications import log
  14. from applications import longArticlesMySQL
  15. from applications import WeixinSpider
  16. from applications.const import AccountAssociationTaskConst
  17. from applications.functions import Functions
  18. from config import apolloConfig
  19. const = AccountAssociationTaskConst()
  20. function = Functions()
  21. config = apolloConfig()
  22. empty_dict = {}
  23. def get_inner_account_gh_id() -> Set[str]:
  24. """
  25. 获取内部账号名称
  26. :return:
  27. """
  28. accounts = aiditApi.get_publish_account_from_aigc()
  29. gh_id_list = [i['ghId'] for i in accounts]
  30. return set(gh_id_list)
  31. class AccountAssociationCrawler(object):
  32. """
  33. 账号抓取
  34. """
  35. def __init__(self):
  36. self.db_client = longArticlesMySQL()
  37. self.spider = WeixinSpider()
  38. self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
  39. self.crawler_account_count = 0
  40. self.total_crawler_count = 0
  41. self.inner_account_count = 0
  42. self.account_name_filter_count = 0
  43. self.already_crawler_account_count = 0
  44. self.official_accounts = 0
  45. def is_bad_account(self, account_name: str) -> bool:
  46. """
  47. 判断账号是否为bad account
  48. :param account_name:
  49. :return:
  50. """
  51. if account_name == "":
  52. return True
  53. for key in self.account_name_filter:
  54. if key in account_name:
  55. return True
  56. return False
  57. def is_account_official(self, gh_id: str) -> bool:
  58. """
  59. 判断账号是否为官方账号
  60. :param gh_id:
  61. :return: True or False
  62. """
  63. response = self.spider.update_msg_list(ghId=gh_id, index=None)
  64. article_list = response['data']['data']
  65. published_articles_send_date = []
  66. for item in article_list:
  67. if item.get("AppMsg", empty_dict).get("BaseInfo", empty_dict).get("Type") == 9:
  68. # 获取群发头条的send_time
  69. send_time = item['AppMsg']['DetailInfo'][0]['send_time']
  70. send_date = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d')
  71. published_articles_send_date.append(send_date)
  72. published_articles_send_date_set = set(published_articles_send_date)
  73. if len(published_articles_send_date_set) == len(published_articles_send_date):
  74. return False
  75. else:
  76. return True
  77. def get_seed_titles(self, run_date: datetime) -> List[Dict]:
  78. """
  79. :return:
  80. """
  81. publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
  82. sql = f"""
  83. SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
  84. FROM datastat_sort_strategy t1
  85. JOIN long_articles_text t2
  86. ON t1.source_id = t2.content_id
  87. WHERE t1.read_rate > {const.READ_AVG_MULTIPLE}
  88. AND t1.view_count > {const.MIN_READ_COUNT}
  89. AND publish_timestamp > {publish_timestamp_threshold}
  90. ORDER BY read_rate DESC
  91. LIMIT {const.SEED_TITLE_LIMIT};
  92. """
  93. article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
  94. return article_obj_list
  95. def search_account_in_weixin(self, article_obj: Dict) -> Dict:
  96. """
  97. 通过文章信息使用搜索接口搜索账号
  98. :param article_obj:
  99. :return:
  100. """
  101. ori_title = article_obj['title']
  102. summary = article_obj['kimi_summary']
  103. kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
  104. response_1 = self.spider.search_articles(title=ori_title)
  105. response_2 = self.spider.search_articles(title=summary) if summary else {}
  106. response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
  107. response = {
  108. "title": response_1,
  109. "summary": response_2,
  110. "kimi_keys": response_3
  111. }
  112. return response
  113. def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
  114. """
  115. :param biz_date:
  116. :param category:
  117. :param account_name:
  118. :param gh_id:
  119. :return:
  120. """
  121. insert_sql = f"""
  122. INSERT INTO long_articles_accounts
  123. (gh_id, account_source, account_name, account_category, init_date)
  124. values
  125. (%s, %s, %s, %s, %s)
  126. """
  127. affected_rows = self.db_client.update(
  128. sql=insert_sql,
  129. params=(gh_id, "weixin", account_name, category, biz_date)
  130. )
  131. return affected_rows
  132. def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
  133. """
  134. 保存账号信息
  135. :param biz_date:
  136. :param search_response:
  137. :param inner_account_gh_id_set:
  138. :return:
  139. """
  140. for key in search_response:
  141. value = search_response[key]
  142. if value:
  143. search_article_list = value['data']['data']
  144. for article in tqdm(search_article_list):
  145. article_url = article['url']
  146. try:
  147. account_info = self.spider.get_account_by_url(article_url)
  148. self.total_crawler_count += 1
  149. account_name = account_info['data']['data']['account_name']
  150. gh_id = account_info['data']['data']['wx_gh']
  151. # 过滤内部账号
  152. if gh_id in inner_account_gh_id_set:
  153. self.inner_account_count += 1
  154. continue
  155. # 通过账号名称过滤一些bad_account or dangerous account
  156. if self.is_bad_account(account_name):
  157. self.account_name_filter_count += 1
  158. continue
  159. # 判断账号是否为官方账号
  160. if self.is_account_official(gh_id):
  161. self.official_accounts += 1
  162. continue
  163. try:
  164. self.insert_account_into_database(
  165. account_name=account_name,
  166. gh_id=gh_id,
  167. category="account_association",
  168. biz_date=biz_date
  169. )
  170. except Exception as e:
  171. self.already_crawler_account_count += 1
  172. print(e)
  173. continue
  174. self.crawler_account_count += 1
  175. except Exception as e:
  176. log(
  177. task="account_association",
  178. function="save_account_into_db",
  179. data={
  180. "biz_date": biz_date,
  181. "article": article,
  182. "trace_back": traceback.format_exc(),
  183. "error": f"{e}"
  184. }
  185. )
  186. continue
  187. else:
  188. continue
  189. def run_account_association(self, biz_date: datetime):
  190. """
  191. 执行账号联想
  192. :param biz_date:
  193. :return:
  194. """
  195. inner_account_gh_id_set = get_inner_account_gh_id()
  196. seed_articles = self.get_seed_titles(biz_date)
  197. for article in tqdm(seed_articles):
  198. try:
  199. # search from weixin
  200. search_response = self.search_account_in_weixin(article)
  201. # save
  202. self.save_account_into_db(
  203. search_response=search_response,
  204. inner_account_gh_id_set=inner_account_gh_id_set,
  205. biz_date=biz_date.strftime("%Y-%m-%d")
  206. )
  207. except Exception as e:
  208. log(
  209. task="account_association",
  210. function="run_account_association",
  211. data={
  212. "biz_date": biz_date.strftime("%Y-%m-%d"),
  213. "article": article,
  214. "trace_back": traceback.format_exc(),
  215. "error": f"{e}"
  216. }
  217. )
  218. bot(
  219. title="账号联想-账号抓取完成",
  220. detail={
  221. "总共联想到的账号数": self.total_crawler_count,
  222. "内部账号过滤": self.inner_account_count,
  223. "账号名称过滤": self.account_name_filter_count,
  224. "官方账号过滤": self.official_accounts,
  225. "已经抓取账号": self.already_crawler_account_count,
  226. "新增账号": self.crawler_account_count
  227. }
  228. )