weixin_account_association_crawler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. """
  2. @author: luojunhui
  3. 微信账号联想
  4. """
  5. import datetime
  6. import json
  7. import traceback
  8. from typing import List, Set, Dict
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import WeixinSpider, longArticlesMySQL, log, bot, aiditApi
  12. from applications.const import WeixinVideoCrawlerConst
  13. from applications.functions import Functions
  14. from config import apolloConfig
  15. const = WeixinVideoCrawlerConst()
  16. function = Functions()
  17. config = apolloConfig()
  18. def get_inner_account_gh_id() -> Set[str]:
  19. """
  20. 获取内部账号名称
  21. :return:
  22. """
  23. accounts = aiditApi.get_publish_account_from_aigc()
  24. gh_id_list = [i['ghId'] for i in accounts]
  25. return set(gh_id_list)
  26. class AccountAssociationCrawler(object):
  27. """
  28. 账号抓取
  29. """
  30. def __init__(self):
  31. self.db_client = longArticlesMySQL()
  32. self.spider = WeixinSpider()
  33. self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
  34. self.crawler_account_count = 0
  35. self.total_crawler_count = 0
  36. self.inner_account_count = 0
  37. self.account_name_filter_count = 0
  38. self.already_crawler_account_count = 0
  39. def is_bad_account(self, account_name: str) -> bool:
  40. """
  41. 判断账号是否为bad account
  42. :param account_name:
  43. :return:
  44. """
  45. if account_name == "":
  46. return True
  47. for key in self.account_name_filter:
  48. if key in account_name:
  49. return True
  50. return False
  51. def get_seed_titles(self, run_date: datetime) -> List[Dict]:
  52. """
  53. :return:
  54. """
  55. publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
  56. sql = f"""
  57. SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
  58. FROM datastat_sort_strategy t1
  59. JOIN long_articles_text t2
  60. ON t1.source_id = t2.content_id
  61. WHERE t1.read_rate > {const.READ_AVG_MULTIPLE}
  62. AND t1.view_count > {const.MIN_READ_COUNT}
  63. AND publish_timestamp > {publish_timestamp_threshold}
  64. ORDER BY read_rate DESC
  65. LIMIT 100;
  66. """
  67. article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
  68. return article_obj_list
  69. def search_account_in_weixin(self, article_obj: Dict) -> Dict:
  70. """
  71. 通过文章信息使用搜索接口搜索账号
  72. :param article_obj:
  73. :return:
  74. """
  75. ori_title = article_obj['title']
  76. summary = article_obj['kimi_summary']
  77. kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
  78. response_1 = self.spider.search_articles(title=ori_title)
  79. response_2 = self.spider.search_articles(title=summary) if summary else {}
  80. response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
  81. response = {
  82. "title": response_1,
  83. "summary": response_2,
  84. "kimi_keys": response_3
  85. }
  86. return response
  87. def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
  88. """
  89. :param biz_date:
  90. :param category:
  91. :param account_name:
  92. :param gh_id:
  93. :return:
  94. """
  95. insert_sql = f"""
  96. INSERT INTO long_articles_accounts
  97. (gh_id, account_source, account_name, account_category, init_date)
  98. values
  99. (%s, %s, %s, %s, %s)
  100. """
  101. affected_rows = self.db_client.update(
  102. sql=insert_sql,
  103. params=(gh_id, "weixin", account_name, category, biz_date)
  104. )
  105. return affected_rows
  106. def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
  107. """
  108. 保存账号信息
  109. :param biz_date:
  110. :param search_response:
  111. :param inner_account_gh_id_set:
  112. :return:
  113. """
  114. for key in search_response:
  115. value = search_response[key]
  116. if value:
  117. search_article_list = value['data']['data']
  118. for article in tqdm(search_article_list):
  119. article_url = article['url']
  120. try:
  121. account_info = self.spider.get_account_by_url(article_url)
  122. self.total_crawler_count += 1
  123. account_name = account_info['data']['data']['account_name']
  124. gh_id = account_info['data']['data']['wx_gh']
  125. # 过滤内部账号
  126. if gh_id in inner_account_gh_id_set:
  127. self.inner_account_count += 1
  128. continue
  129. # 通过账号名称过滤一些bad_account or dangerous account
  130. if self.is_bad_account(account_name):
  131. self.account_name_filter_count += 1
  132. continue
  133. try:
  134. self.insert_account_into_database(
  135. account_name=account_name,
  136. gh_id=gh_id,
  137. category="account_association",
  138. biz_date=biz_date
  139. )
  140. except Exception as e:
  141. self.already_crawler_account_count += 1
  142. print(e)
  143. continue
  144. self.crawler_account_count += 1
  145. except Exception as e:
  146. log(
  147. task="account_association",
  148. function="save_account_into_db",
  149. data={
  150. "biz_date": biz_date,
  151. "article": article,
  152. "trace_back": traceback.format_exc(),
  153. "error": f"{e}"
  154. }
  155. )
  156. continue
  157. else:
  158. continue
  159. def run_account_association(self, biz_date: datetime):
  160. """
  161. 执行账号联想
  162. :param biz_date:
  163. :return:
  164. """
  165. inner_account_gh_id_set = get_inner_account_gh_id()
  166. seed_articles = self.get_seed_titles(biz_date)
  167. for article in tqdm(seed_articles):
  168. try:
  169. # search from weixin
  170. search_response = self.search_account_in_weixin(article)
  171. # save
  172. self.save_account_into_db(
  173. search_response=search_response,
  174. inner_account_gh_id_set=inner_account_gh_id_set,
  175. biz_date=biz_date.strftime("%Y-%m-%d")
  176. )
  177. except Exception as e:
  178. log(
  179. task="account_association",
  180. function="run_account_association",
  181. data={
  182. "biz_date": biz_date,
  183. "article": article,
  184. "trace_back": traceback.format_exc(),
  185. "error": f"{e}"
  186. }
  187. )
  188. bot(
  189. title="账号联想-账号抓取完成",
  190. detail={
  191. "总共联想到的账号数": self.total_crawler_count,
  192. "内部账号过滤": self.inner_account_count,
  193. "账号名称过滤": self.account_name_filter_count,
  194. "已经抓取账号": self.already_crawler_account_count,
  195. "新增账号": self.crawler_account_count
  196. }
  197. )