outside_gzh_articles_monitor.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import time
  2. from tqdm import tqdm
  3. from pymysql.cursors import DictCursor
  4. from applications.api import FeishuBotApi
  5. from applications.db import DatabaseConnector
  6. from cold_start.crawler.wechat import get_article_detail
  7. from cold_start.crawler.wechat import get_article_list_from_account
  8. from config import long_articles_config, denet_config
  9. class OutsideGzhArticlesManager:
  10. def __init__(self):
  11. self.long_articles_client = DatabaseConnector(long_articles_config)
  12. self.long_articles_client.connect()
  13. self.denet_client = DatabaseConnector(denet_config)
  14. self.denet_client.connect()
  15. self.feishu_bot_api = FeishuBotApi()
  16. def update_article_illegal_status(self, article_id: int, illegal_reason: str) -> None:
  17. update_query = f"""
  18. update outside_gzh_account_monitor
  19. set illegal_status = %s, illegal_reason = %s
  20. where id = %s and illegal_reason = %s
  21. """
  22. self.long_articles_client.save(
  23. query=update_query,
  24. params=(1, illegal_reason, article_id, 0)
  25. )
  26. def whether_published_in_a_week(self, gh_id: str) -> bool:
  27. """
  28. 判断该账号一周内是否有发文,如有,则说无需抓
  29. """
  30. fetch_query = f"""
  31. select id, publish_timestamp from outside_gzh_account_monitor
  32. where gh_id = '{gh_id}'
  33. order by publish_timestamp desc
  34. limit 1;
  35. """
  36. fetch_response = self.long_articles_client.fetch(query=fetch_query, cursor_type=DictCursor)
  37. if fetch_response:
  38. publish_timestamp = fetch_response[0]['publish_timestamp']
  39. if publish_timestamp is None:
  40. return False
  41. else:
  42. return int(time.time()) - publish_timestamp <= 5 * 24 * 3600
  43. else:
  44. return False
  45. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  46. def fetch_outside_account_list(self):
  47. fetch_query = f"""
  48. select
  49. t2.group_source_name as account_source,
  50. t3.name as account_name,
  51. t3.gh_id as gh_id,
  52. t3.status as status
  53. from wx_statistics_group_source t1
  54. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  55. join publish_account t3 on t3.id = t2.account_id
  56. where
  57. t1.mode_type = '代运营服务号';
  58. """
  59. return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
  60. def fetch_each_account(self, account: dict):
  61. gh_id = account["gh_id"]
  62. # 判断该账号本周是否已经发布过
  63. if self.whether_published_in_a_week(gh_id):
  64. return
  65. fetch_response = get_article_list_from_account(gh_id)
  66. try:
  67. msg_list = fetch_response.get("data", {}).get("data", [])
  68. if msg_list:
  69. for msg in tqdm(msg_list, desc=f"insert account {account['account_name']}"):
  70. self.save_each_msg_to_db(msg, account)
  71. else:
  72. print(f"crawler failed: {account['account_name']}")
  73. except Exception as e:
  74. print(
  75. f"crawler failed: account_name: {account['account_name']}\n"
  76. f"error: {e}\n"
  77. )
  78. def save_each_msg_to_db(self, msg: dict, account: dict):
  79. base_info = msg["AppMsg"]["BaseInfo"]
  80. detail_info = msg["AppMsg"]["DetailInfo"]
  81. app_msg_id = base_info["AppMsgId"]
  82. create_timestamp = base_info["CreateTime"]
  83. publish_type = base_info["Type"]
  84. # insert each article
  85. for article in detail_info:
  86. link = article["ContentUrl"]
  87. article_detail = get_article_detail(link)
  88. response_code = article_detail["code"]
  89. if response_code == 25012:
  90. illegal_reason = article_detail.get("msg")
  91. # bot and return
  92. self.feishu_bot_api.bot(
  93. title="文章违规告警",
  94. detail={
  95. "account_name": article["account_name"],
  96. "title": article['title'],
  97. "reason": illegal_reason,
  98. "publish_timestamp": create_timestamp,
  99. "account_source": article["account_source"]
  100. },
  101. env="outside_gzh_monitor"
  102. )
  103. elif response_code == 0:
  104. insert_query = f"""
  105. insert ignore into outside_gzh_account_monitor
  106. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  107. channel_content_id, crawler_timestamp, publish_timestamp)
  108. values
  109. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  110. """
  111. self.long_articles_client.save(
  112. query=insert_query,
  113. params=(
  114. account["account_name"],
  115. account["gh_id"],
  116. account["account_source"],
  117. "服务号",
  118. app_msg_id,
  119. publish_type,
  120. article["ItemIndex"],
  121. article["Title"],
  122. link,
  123. article_detail["data"]["data"]["channel_content_id"],
  124. int(time.time()),
  125. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  126. ),
  127. )
  128. else:
  129. continue
  130. def deal(self):
  131. account_list = self.fetch_outside_account_list()
  132. for account in tqdm(account_list):
  133. try:
  134. self.fetch_each_account(account)
  135. except Exception as e:
  136. print(
  137. f"crawler failed: {account['account_name']}, error: {e}"
  138. )
  139. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  140. def fetch_article_list_to_check(self):
  141. publish_timestamp_threshold = int(time.time()) - 7 * 24 * 3600
  142. fetch_query = f"""
  143. select id, account_name, gh_id, account_source, account_type,
  144. title, link, from_unixtime(publish_timestamp) as publish_date
  145. from outside_gzh_account_monitor
  146. where illegal_status = 0 and publish_timestamp > {publish_timestamp_threshold};
  147. """
  148. return self.long_articles_client.fetch(
  149. query=fetch_query, cursor_type=DictCursor
  150. )
  151. def check_each_article(self, article: dict):
  152. """
  153. check each article
  154. """
  155. link = article["link"]
  156. article_detail = get_article_detail(link)
  157. response_code = article_detail["code"]
  158. if response_code == 25012:
  159. illegal_reason = article_detail.get("msg")
  160. self.feishu_bot_api.bot(
  161. title="文章违规告警",
  162. detail={
  163. "account_name": article["account_name"],
  164. "title": article['title'],
  165. "reason": illegal_reason,
  166. "publish_date": article["publish_date"],
  167. "account_source": article["account_source"]
  168. },
  169. env="outside_gzh_monitor"
  170. )
  171. article_id = article["id"]
  172. self.update_article_illegal_status(article_id, illegal_reason)
  173. else:
  174. return
  175. def deal(self):
  176. article_list = self.fetch_article_list_to_check()
  177. for article in tqdm(article_list):
  178. try:
  179. self.check_each_article(article)
  180. except Exception as e:
  181. print(
  182. f"crawler failed: account_name: {article['account_name']}\n"
  183. f"link: {article['link']}\n"
  184. f"title: {article['title']}\n"
  185. f"error: {e}\n"
  186. )