outside_gzh_articles_monitor.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import time
  2. from tqdm import tqdm
  3. from pymysql.cursors import DictCursor
  4. from applications.api import FeishuBotApi
  5. from applications.db import DatabaseConnector
  6. from cold_start.crawler.wechat import get_article_detail
  7. from cold_start.crawler.wechat import get_article_list_from_account
  8. from config import long_articles_config, denet_config
  9. class Const:
  10. # 文章违规状态
  11. ILLEGAL_STATUS = 1
  12. INIT_STATUS = 0
  13. # 监测周期
  14. MONITOR_CYCLE = 5 * 60 * 60 * 24
  15. # Article Code
  16. ILLEGAL_CODE = 25012
  17. SUCCESS_CODE = 0
  18. class OutsideGzhArticlesManager(Const):
  19. def __init__(self):
  20. self.long_articles_client = DatabaseConnector(long_articles_config)
  21. self.long_articles_client.connect()
  22. self.denet_client = DatabaseConnector(denet_config)
  23. self.denet_client.connect()
  24. self.feishu_bot_api = FeishuBotApi()
  25. def update_article_illegal_status(
  26. self, article_id: int, illegal_reason: str
  27. ) -> None:
  28. update_query = f"""
  29. update outside_gzh_account_monitor
  30. set illegal_status = %s, illegal_reason = %s
  31. where id = %s and illegal_reason = %s
  32. """
  33. self.long_articles_client.save(
  34. query=update_query,
  35. params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
  36. )
  37. def whether_published_in_a_week(self, gh_id: str) -> bool:
  38. """
  39. 判断该账号一周内是否有发文,如有,则说无需抓
  40. """
  41. fetch_query = f"""
  42. select id, publish_timestamp from outside_gzh_account_monitor
  43. where gh_id = '{gh_id}'
  44. order by publish_timestamp desc
  45. limit 1;
  46. """
  47. fetch_response = self.long_articles_client.fetch(
  48. query=fetch_query, cursor_type=DictCursor
  49. )
  50. if fetch_response:
  51. publish_timestamp = fetch_response[0]["publish_timestamp"]
  52. if publish_timestamp is None:
  53. return False
  54. else:
  55. return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
  56. else:
  57. return False
  58. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  59. def fetch_outside_account_list(self):
  60. fetch_query = f"""
  61. select
  62. t2.group_source_name as account_source,
  63. t3.name as account_name,
  64. t3.gh_id as gh_id,
  65. t3.status as status
  66. from wx_statistics_group_source t1
  67. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  68. join publish_account t3 on t3.id = t2.account_id
  69. where
  70. t1.mode_type = '代运营服务号';
  71. """
  72. return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
  73. def fetch_each_account(self, account: dict):
  74. gh_id = account["gh_id"]
  75. # 判断该账号本周是否已经发布过
  76. if self.whether_published_in_a_week(gh_id):
  77. return
  78. fetch_response = get_article_list_from_account(gh_id)
  79. try:
  80. msg_list = fetch_response.get("data", {}).get("data", [])
  81. if msg_list:
  82. for msg in tqdm(
  83. msg_list, desc=f"insert account {account['account_name']}"
  84. ):
  85. self.save_each_msg_to_db(msg, account)
  86. else:
  87. print(f"crawler failed: {account['account_name']}")
  88. except Exception as e:
  89. print(
  90. f"crawler failed: account_name: {account['account_name']}\n"
  91. f"error: {e}\n"
  92. )
  93. def save_each_msg_to_db(self, msg: dict, account: dict):
  94. base_info = msg["AppMsg"]["BaseInfo"]
  95. detail_info = msg["AppMsg"]["DetailInfo"]
  96. app_msg_id = base_info["AppMsgId"]
  97. create_timestamp = base_info["CreateTime"]
  98. publish_type = base_info["Type"]
  99. # insert each article
  100. for article in detail_info:
  101. link = article["ContentUrl"]
  102. article_detail = get_article_detail(link)
  103. response_code = article_detail["code"]
  104. if response_code == self.ILLEGAL_CODE:
  105. illegal_reason = article_detail.get("msg")
  106. # bot and return
  107. self.feishu_bot_api.bot(
  108. title="文章违规告警",
  109. detail={
  110. "account_name": article["account_name"],
  111. "title": article["title"],
  112. "reason": illegal_reason,
  113. "publish_timestamp": create_timestamp,
  114. "account_source": article["account_source"],
  115. },
  116. env="outside_gzh_monitor",
  117. )
  118. elif response_code == self.SUCCESS_CODE:
  119. insert_query = f"""
  120. insert ignore into outside_gzh_account_monitor
  121. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  122. channel_content_id, crawler_timestamp, publish_timestamp)
  123. values
  124. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  125. """
  126. self.long_articles_client.save(
  127. query=insert_query,
  128. params=(
  129. account["account_name"],
  130. account["gh_id"],
  131. account["account_source"],
  132. "服务号",
  133. app_msg_id,
  134. publish_type,
  135. article["ItemIndex"],
  136. article["Title"],
  137. link,
  138. article_detail["data"]["data"]["channel_content_id"],
  139. int(time.time()),
  140. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  141. ),
  142. )
  143. else:
  144. continue
  145. def deal(self):
  146. account_list = self.fetch_outside_account_list()
  147. for account in tqdm(account_list):
  148. try:
  149. self.fetch_each_account(account)
  150. except Exception as e:
  151. print(f"crawler failed: {account['account_name']}, error: {e}")
  152. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  153. def fetch_article_list_to_check(self):
  154. publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
  155. fetch_query = f"""
  156. select id, account_name, gh_id, account_source, account_type,
  157. title, link, from_unixtime(publish_timestamp) as publish_date
  158. from outside_gzh_account_monitor
  159. where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
  160. """
  161. return self.long_articles_client.fetch(
  162. query=fetch_query, cursor_type=DictCursor
  163. )
  164. def check_each_article(self, article: dict):
  165. """
  166. check each article
  167. """
  168. link = article["link"]
  169. article_detail = get_article_detail(link)
  170. response_code = article_detail["code"]
  171. if response_code == self.ILLEGAL_CODE:
  172. illegal_reason = article_detail.get("msg")
  173. self.feishu_bot_api.bot(
  174. title="文章违规告警",
  175. detail={
  176. "account_name": article["account_name"],
  177. "title": article["title"],
  178. "reason": illegal_reason,
  179. "publish_date": article["publish_date"],
  180. "account_source": article["account_source"],
  181. },
  182. env="outside_gzh_monitor",
  183. )
  184. article_id = article["id"]
  185. self.update_article_illegal_status(article_id, illegal_reason)
  186. else:
  187. return
  188. def deal(self):
  189. article_list = self.fetch_article_list_to_check()
  190. for article in tqdm(article_list):
  191. try:
  192. self.check_each_article(article)
  193. except Exception as e:
  194. print(
  195. f"crawler failed: account_name: {article['account_name']}\n"
  196. f"link: {article['link']}\n"
  197. f"title: {article['title']}\n"
  198. f"error: {e}\n"
  199. )