gzh_article_monitor.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import time
  2. import datetime
  3. from tqdm import tqdm
  4. from applications.api import feishu_robot
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.crawler.wechat import get_article_list_from_account
  7. class MonitorConst:
  8. # 文章违规状态
  9. ILLEGAL_STATUS = 1
  10. INIT_STATUS = 0
  11. # 监测周期
  12. MONITOR_CYCLE = 5 * 24 * 3600
  13. # article code
  14. ARTICLE_ILLEGAL_CODE = 25012
  15. ARTICLE_DELETE_CODE = 25005
  16. ARTICLE_SUCCESS_CODE = 0
  17. ARTICLE_UNKNOWN_CODE = 10000
  18. # Task status
  19. TASK_SUCCESS_CODE = 2
  20. TASK_FAIL_CODE = 99
  21. class OutsideGzhArticlesManager(MonitorConst):
  22. def __init__(self, pool):
  23. self.pool = pool
  24. async def update_article_illegal_status(
  25. self, article_id: int, illegal_reason: str
  26. ) -> None:
  27. query = f"""
  28. update outside_gzh_account_monitor
  29. set illegal_status = %s, illegal_reason = %s
  30. where id = %s and illegal_status = %s
  31. """
  32. await self.pool.async_save(
  33. query=query,
  34. params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
  35. )
  36. async def whether_published_in_a_week(self, gh_id: str) -> bool:
  37. """
  38. 判断该账号一周内是否有发文,如有,则说无需抓
  39. """
  40. query = f"""
  41. select id, publish_timestamp from outside_gzh_account_monitor
  42. where gh_id = %s
  43. order by publish_timestamp desc
  44. limit %s;
  45. """
  46. response, error = await self.pool.async_fetch(query=query, params=(gh_id, 1))
  47. if response:
  48. publish_timestamp = response[0]["publish_timestamp"]
  49. if publish_timestamp is None:
  50. return False
  51. else:
  52. return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
  53. else:
  54. return False
  55. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  56. async def fetch_outside_account_list(self):
  57. query = f"""
  58. select
  59. t2.group_source_name as account_source,
  60. t3.name as account_name,
  61. t3.gh_id as gh_id,
  62. t3.status as status
  63. from wx_statistics_group_source t1
  64. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  65. join publish_account t3 on t3.id = t2.account_id
  66. where
  67. t1.mode_type = '代运营服务号';
  68. """
  69. response, error = await self.pool.async_fetch(query=query, db_name="aigc_db_pool")
  70. return response
  71. async def fetch_each_account(self, account: dict):
  72. gh_id = account["gh_id"]
  73. # 判断该账号本周是否已经发布过
  74. if await self.whether_published_in_a_week(gh_id):
  75. return
  76. fetch_response = get_article_list_from_account(gh_id)
  77. try:
  78. msg_list = fetch_response.get("data", {}).get("data", [])
  79. if msg_list:
  80. for msg in tqdm(
  81. msg_list, desc=f"insert account {account['account_name']}"
  82. ):
  83. await self.save_each_msg_to_db(msg, account)
  84. else:
  85. print(f"crawler failed: {account['account_name']}")
  86. except Exception as e:
  87. print(
  88. f"crawler failed: account_name: {account['account_name']}\n"
  89. f"error: {e}\n"
  90. )
  91. async def save_each_msg_to_db(self, msg: dict, account: dict):
  92. base_info = msg["AppMsg"]["BaseInfo"]
  93. detail_info = msg["AppMsg"]["DetailInfo"]
  94. app_msg_id = base_info["AppMsgId"]
  95. create_timestamp = base_info["CreateTime"]
  96. publish_type = base_info["Type"]
  97. # insert each article
  98. for article in detail_info:
  99. link = article["ContentUrl"]
  100. article_detail = get_article_detail(link)
  101. response_code = article_detail["code"]
  102. if response_code == self.ARTICLE_ILLEGAL_CODE:
  103. illegal_reason = article_detail.get("msg")
  104. # bot and return
  105. await feishu_robot.bot(
  106. title="文章违规告警",
  107. detail={
  108. "账号名称": article["account_name"],
  109. "标题": article["title"],
  110. "违规理由": illegal_reason,
  111. "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
  112. "账号合作商": article["account_source"],
  113. },
  114. env="outside_gzh_monitor",
  115. mention=False
  116. )
  117. elif response_code == self.ARTICLE_SUCCESS_CODE:
  118. insert_query = f"""
  119. insert ignore into outside_gzh_account_monitor
  120. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  121. channel_content_id, crawler_timestamp, publish_timestamp)
  122. values
  123. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  124. """
  125. await self.pool.async_save(
  126. query=insert_query,
  127. params=(
  128. account["account_name"],
  129. account["gh_id"],
  130. account["account_source"],
  131. "服务号",
  132. app_msg_id,
  133. publish_type,
  134. article["ItemIndex"],
  135. article["Title"],
  136. link,
  137. article_detail["data"]["data"]["channel_content_id"],
  138. int(time.time()),
  139. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  140. ),
  141. )
  142. else:
  143. continue
  144. async def deal(self):
  145. account_list = await self.fetch_outside_account_list()
  146. for account in tqdm(account_list):
  147. try:
  148. await self.fetch_each_account(account)
  149. except Exception as e:
  150. print(f"crawler failed: {account['account_name']}, error: {e}")
  151. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  152. async def fetch_article_list_to_check(self):
  153. publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
  154. fetch_query = f"""
  155. select id, account_name, gh_id, account_source, account_type,
  156. title, link, from_unixtime(publish_timestamp) as publish_date
  157. from outside_gzh_account_monitor
  158. where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
  159. """
  160. response, error = await self.pool.async_fetch(query=fetch_query)
  161. return response
  162. async def check_each_article(self, article: dict):
  163. """
  164. check each article
  165. """
  166. link = article["link"]
  167. article_detail = get_article_detail(link)
  168. response_code = article_detail["code"]
  169. if response_code == self.ARTICLE_ILLEGAL_CODE:
  170. illegal_reason = article_detail.get("msg")
  171. # illegal_reason = '测试报警功能'
  172. feishu_robot.bot(
  173. title="文章违规告警",
  174. detail={
  175. "账号名称": article["account_name"],
  176. "标题": article["title"],
  177. "违规理由": illegal_reason,
  178. "发布日期": str(article["publish_date"]),
  179. "账号合作商": article["account_source"],
  180. },
  181. env="outside_gzh_monitor",
  182. mention=False
  183. )
  184. article_id = article["id"]
  185. await self.update_article_illegal_status(article_id, illegal_reason)
  186. else:
  187. return
  188. async def deal(self):
  189. article_list = await self.fetch_article_list_to_check()
  190. for article in tqdm(article_list):
  191. try:
  192. await self.check_each_article(article)
  193. except Exception as e:
  194. print(
  195. f"crawler failed: account_name: {article['account_name']}\n"
  196. f"link: {article['link']}\n"
  197. f"title: {article['title']}\n"
  198. f"error: {e}\n"
  199. )
  200. return self.TASK_SUCCESS_CODE