weixin_account_association_crawler.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """
  2. @author: luojunhui
  3. 微信账号联想
  4. """
  5. import datetime
  6. import json
  7. import traceback
  8. from typing import List, Set, Dict
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import aiditApi
  12. from applications import bot
  13. from applications import log
  14. from applications import longArticlesMySQL
  15. from applications import WeixinSpider
  16. from applications.const import AccountAssociationTaskConst
  17. from applications.functions import Functions
  18. from config import apolloConfig
  19. const = AccountAssociationTaskConst()
  20. function = Functions()
  21. config = apolloConfig()
  22. def get_inner_account_gh_id() -> Set[str]:
  23. """
  24. 获取内部账号名称
  25. :return:
  26. """
  27. accounts = aiditApi.get_publish_account_from_aigc()
  28. gh_id_list = [i['ghId'] for i in accounts]
  29. return set(gh_id_list)
  30. class AccountAssociationCrawler(object):
  31. """
  32. 账号抓取
  33. """
  34. def __init__(self):
  35. self.db_client = longArticlesMySQL()
  36. self.spider = WeixinSpider()
  37. self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
  38. self.crawler_account_count = 0
  39. self.total_crawler_count = 0
  40. self.inner_account_count = 0
  41. self.account_name_filter_count = 0
  42. self.already_crawler_account_count = 0
  43. def is_bad_account(self, account_name: str) -> bool:
  44. """
  45. 判断账号是否为bad account
  46. :param account_name:
  47. :return:
  48. """
  49. if account_name == "":
  50. return True
  51. for key in self.account_name_filter:
  52. if key in account_name:
  53. return True
  54. return False
  55. def get_seed_titles(self, run_date: datetime) -> List[Dict]:
  56. """
  57. :return:
  58. """
  59. publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
  60. sql = f"""
  61. SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
  62. FROM datastat_sort_strategy t1
  63. JOIN long_articles_text t2
  64. ON t1.source_id = t2.content_id
  65. WHERE t1.read_rate > {const.READ_AVG_MULTIPLE}
  66. AND t1.view_count > {const.MIN_READ_COUNT}
  67. AND publish_timestamp > {publish_timestamp_threshold}
  68. ORDER BY read_rate DESC
  69. LIMIT {const.SEED_TITLE_LIMIT};
  70. """
  71. article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
  72. return article_obj_list
  73. def search_account_in_weixin(self, article_obj: Dict) -> Dict:
  74. """
  75. 通过文章信息使用搜索接口搜索账号
  76. :param article_obj:
  77. :return:
  78. """
  79. ori_title = article_obj['title']
  80. summary = article_obj['kimi_summary']
  81. kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
  82. response_1 = self.spider.search_articles(title=ori_title)
  83. response_2 = self.spider.search_articles(title=summary) if summary else {}
  84. response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
  85. response = {
  86. "title": response_1,
  87. "summary": response_2,
  88. "kimi_keys": response_3
  89. }
  90. return response
  91. def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
  92. """
  93. :param biz_date:
  94. :param category:
  95. :param account_name:
  96. :param gh_id:
  97. :return:
  98. """
  99. insert_sql = f"""
  100. INSERT INTO long_articles_accounts
  101. (gh_id, account_source, account_name, account_category, init_date)
  102. values
  103. (%s, %s, %s, %s, %s)
  104. """
  105. affected_rows = self.db_client.update(
  106. sql=insert_sql,
  107. params=(gh_id, "weixin", account_name, category, biz_date)
  108. )
  109. return affected_rows
  110. def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
  111. """
  112. 保存账号信息
  113. :param biz_date:
  114. :param search_response:
  115. :param inner_account_gh_id_set:
  116. :return:
  117. """
  118. for key in search_response:
  119. value = search_response[key]
  120. if value:
  121. search_article_list = value['data']['data']
  122. for article in tqdm(search_article_list):
  123. article_url = article['url']
  124. try:
  125. account_info = self.spider.get_account_by_url(article_url)
  126. self.total_crawler_count += 1
  127. account_name = account_info['data']['data']['account_name']
  128. gh_id = account_info['data']['data']['wx_gh']
  129. # 过滤内部账号
  130. if gh_id in inner_account_gh_id_set:
  131. self.inner_account_count += 1
  132. continue
  133. # 通过账号名称过滤一些bad_account or dangerous account
  134. if self.is_bad_account(account_name):
  135. self.account_name_filter_count += 1
  136. continue
  137. try:
  138. self.insert_account_into_database(
  139. account_name=account_name,
  140. gh_id=gh_id,
  141. category="account_association",
  142. biz_date=biz_date
  143. )
  144. except Exception as e:
  145. self.already_crawler_account_count += 1
  146. print(e)
  147. continue
  148. self.crawler_account_count += 1
  149. except Exception as e:
  150. log(
  151. task="account_association",
  152. function="save_account_into_db",
  153. data={
  154. "biz_date": biz_date,
  155. "article": article,
  156. "trace_back": traceback.format_exc(),
  157. "error": f"{e}"
  158. }
  159. )
  160. continue
  161. else:
  162. continue
  163. def run_account_association(self, biz_date: datetime):
  164. """
  165. 执行账号联想
  166. :param biz_date:
  167. :return:
  168. """
  169. inner_account_gh_id_set = get_inner_account_gh_id()
  170. seed_articles = self.get_seed_titles(biz_date)
  171. for article in tqdm(seed_articles):
  172. try:
  173. # search from weixin
  174. search_response = self.search_account_in_weixin(article)
  175. # save
  176. self.save_account_into_db(
  177. search_response=search_response,
  178. inner_account_gh_id_set=inner_account_gh_id_set,
  179. biz_date=biz_date.strftime("%Y-%m-%d")
  180. )
  181. except Exception as e:
  182. log(
  183. task="account_association",
  184. function="run_account_association",
  185. data={
  186. "biz_date": biz_date,
  187. "article": article,
  188. "trace_back": traceback.format_exc(),
  189. "error": f"{e}"
  190. }
  191. )
  192. bot(
  193. title="账号联想-账号抓取完成",
  194. detail={
  195. "总共联想到的账号数": self.total_crawler_count,
  196. "内部账号过滤": self.inner_account_count,
  197. "账号名称过滤": self.account_name_filter_count,
  198. "已经抓取账号": self.already_crawler_account_count,
  199. "新增账号": self.crawler_account_count
  200. }
  201. )