gzh_article_monitor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import time
  2. import datetime
  3. from os import WCONTINUED
  4. from typing import Optional, List
  5. from tqdm import tqdm
  6. from applications.api import feishu_robot
  7. from applications.api import delete_illegal_gzh_articles
  8. from applications.crawler.wechat import get_article_detail
  9. from applications.crawler.wechat import get_article_list_from_account
  10. from applications.utils import str_to_md5
  11. class MonitorConst:
  12. # 文章违规状态
  13. ILLEGAL_STATUS = 1
  14. INIT_STATUS = 0
  15. # 监测周期
  16. MONITOR_CYCLE = 3 * 24 * 3600
  17. # article code
  18. ARTICLE_ILLEGAL_CODE = 25012
  19. ARTICLE_DELETE_CODE = 25005
  20. ARTICLE_SUCCESS_CODE = 0
  21. ARTICLE_UNKNOWN_CODE = 10000
  22. # Task status
  23. TASK_SUCCESS_CODE = 2
  24. TASK_FAIL_CODE = 99
  25. class OutsideGzhArticlesManager(MonitorConst):
  26. def __init__(self, pool):
  27. self.pool = pool
  28. async def update_article_illegal_status(
  29. self, article_id: int, illegal_reason: str
  30. ) -> None:
  31. query = f"""
  32. update outside_gzh_account_monitor
  33. set illegal_status = %s, illegal_reason = %s
  34. where id = %s and illegal_status = %s
  35. """
  36. await self.pool.async_save(
  37. query=query,
  38. params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
  39. )
  40. async def whether_published_in_a_week(self, gh_id: str) -> bool:
  41. """
  42. 判断该账号一周内是否有发文,如有,则说无需抓
  43. """
  44. query = f"""
  45. select id, publish_timestamp from outside_gzh_account_monitor
  46. where gh_id = %s
  47. order by publish_timestamp desc
  48. limit %s;
  49. """
  50. response, error = await self.pool.async_fetch(query=query, params=(gh_id, 1))
  51. if response:
  52. publish_timestamp = response[0]["publish_timestamp"]
  53. if publish_timestamp is None:
  54. return False
  55. else:
  56. return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
  57. else:
  58. return False
  59. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  60. async def fetch_outside_account_list(self):
  61. query = f"""
  62. select
  63. t2.group_source_name as account_source,
  64. t3.name as account_name,
  65. t3.gh_id as gh_id,
  66. t3.status as status
  67. from wx_statistics_group_source t1
  68. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  69. join publish_account t3 on t3.id = t2.account_id
  70. where
  71. t1.mode_type = '代运营服务号';
  72. """
  73. response, error = await self.pool.async_fetch(query=query, db_name="aigc")
  74. return response
  75. async def fetch_each_account(self, account: dict):
  76. gh_id = account["gh_id"]
  77. # 判断该账号本周是否已经发布过
  78. if await self.whether_published_in_a_week(gh_id):
  79. return
  80. fetch_response = get_article_list_from_account(gh_id)
  81. try:
  82. msg_list = fetch_response.get("data", {}).get("data", [])
  83. if msg_list:
  84. for msg in tqdm(
  85. msg_list, desc=f"insert account {account['account_name']}"
  86. ):
  87. await self.save_each_msg_to_db(msg, account)
  88. else:
  89. print(f"crawler failed: {account['account_name']}")
  90. except Exception as e:
  91. print(
  92. f"crawler failed: account_name: {account['account_name']}\n"
  93. f"error: {e}\n"
  94. )
  95. async def save_each_msg_to_db(self, msg: dict, account: dict):
  96. base_info = msg["AppMsg"]["BaseInfo"]
  97. detail_info = msg["AppMsg"]["DetailInfo"]
  98. app_msg_id = base_info["AppMsgId"]
  99. create_timestamp = base_info["CreateTime"]
  100. publish_type = base_info["Type"]
  101. # insert each article
  102. for article in detail_info:
  103. link = article["ContentUrl"]
  104. article_detail = get_article_detail(link)
  105. response_code = article_detail["code"]
  106. if response_code == self.ARTICLE_ILLEGAL_CODE:
  107. illegal_reason = article_detail.get("msg")
  108. # bot and return
  109. await feishu_robot.bot(
  110. title="文章违规告警",
  111. detail={
  112. "账号名称": article["account_name"],
  113. "标题": article["title"],
  114. "违规理由": illegal_reason,
  115. "发布日期": datetime.datetime.fromtimestamp(
  116. create_timestamp
  117. ).strftime("%Y-%m-%d %H:%M:%S"),
  118. "账号合作商": article["account_source"],
  119. },
  120. env="outside_gzh_monitor",
  121. mention=False,
  122. )
  123. elif response_code == self.ARTICLE_SUCCESS_CODE:
  124. insert_query = f"""
  125. insert ignore into outside_gzh_account_monitor
  126. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  127. channel_content_id, crawler_timestamp, publish_timestamp)
  128. values
  129. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  130. """
  131. await self.pool.async_save(
  132. query=insert_query,
  133. params=(
  134. account["account_name"],
  135. account["gh_id"],
  136. account["account_source"],
  137. "服务号",
  138. app_msg_id,
  139. publish_type,
  140. article["ItemIndex"],
  141. article["Title"],
  142. link,
  143. article_detail["data"]["data"]["channel_content_id"],
  144. int(time.time()),
  145. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  146. ),
  147. )
  148. else:
  149. continue
  150. async def deal(self):
  151. account_list = await self.fetch_outside_account_list()
  152. for account in tqdm(account_list):
  153. try:
  154. await self.fetch_each_account(account)
  155. except Exception as e:
  156. print(f"crawler failed: {account['account_name']}, error: {e}")
  157. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  158. async def fetch_article_list_to_check(self):
  159. publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
  160. fetch_query = f"""
  161. select id, account_name, gh_id, account_source, account_type,
  162. title, link, from_unixtime(publish_timestamp) as publish_date
  163. from outside_gzh_account_monitor
  164. where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
  165. """
  166. response, error = await self.pool.async_fetch(query=fetch_query)
  167. return response
  168. async def check_each_article(self, article: dict):
  169. """
  170. check each article
  171. """
  172. link = article["link"]
  173. article_detail = get_article_detail(link)
  174. response_code = article_detail["code"]
  175. if response_code == self.ARTICLE_ILLEGAL_CODE:
  176. illegal_reason = article_detail.get("msg")
  177. # illegal_reason = '测试报警功能'
  178. feishu_robot.bot(
  179. title="文章违规告警",
  180. detail={
  181. "账号名称": article["account_name"],
  182. "标题": article["title"],
  183. "违规理由": illegal_reason,
  184. "发布日期": str(article["publish_date"]),
  185. "账号合作商": article["account_source"],
  186. },
  187. env="outside_gzh_monitor",
  188. mention=False,
  189. )
  190. article_id = article["id"]
  191. await self.update_article_illegal_status(article_id, illegal_reason)
  192. else:
  193. return
  194. async def deal(self):
  195. article_list = await self.fetch_article_list_to_check()
  196. for article in tqdm(article_list, desc="外部服务号监控"):
  197. try:
  198. await self.check_each_article(article)
  199. except Exception as e:
  200. print(
  201. f"crawler failed: account_name: {article['account_name']}\n"
  202. f"link: {article['link']}\n"
  203. f"title: {article['title']}\n"
  204. f"error: {e}\n"
  205. )
  206. return self.TASK_SUCCESS_CODE
  207. class InnerGzhArticlesMonitor(MonitorConst):
  208. def __init__(self, pool):
  209. self.pool = pool
  210. async def whether_title_unsafe(self, title: str) -> bool:
  211. """
  212. :param title: gzh article title
  213. :return: bool
  214. """
  215. title_md5 = str_to_md5(title)
  216. query = f"""
  217. select title_md5 from article_unsafe_title where title_md5 = '{title_md5}';
  218. """
  219. response, error = await self.pool.async_fetch(query=query)
  220. return True if response else False
  221. async def fetch_article_list_to_check(self, run_date: str = None) -> Optional[List]:
  222. """
  223. :param run_date: 执行日期,格式为“%Y-%m-%d”, default None
  224. """
  225. if not run_date:
  226. run_date = datetime.datetime.today().strftime("%Y-%m-%d")
  227. run_timestamp = int(
  228. datetime.datetime.strptime(run_date, "%Y-%m-%d").timestamp()
  229. )
  230. start_timestamp = run_timestamp - self.MONITOR_CYCLE
  231. query = f"""
  232. select ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) as publish_timestamp
  233. from official_articles_v2
  234. where publish_timestamp >= {start_timestamp}
  235. order by publish_timestamp desc;
  236. """
  237. response, error = await self.pool.async_fetch(
  238. query=query, db_name="piaoquan_crawler"
  239. )
  240. if error:
  241. await feishu_robot.bot(
  242. title="站内微信公众号发文监测任务异常",
  243. detail={"error": error, "message": "查询数据库异常"},
  244. )
  245. return None
  246. else:
  247. return response
  248. async def check_each_article(self, article: dict):
  249. gh_id, account_name, title, url, wx_sn, publish_date = article
  250. try:
  251. response = get_article_detail(url, is_cache=False)
  252. response_code = response["code"]
  253. if response_code == self.ARTICLE_ILLEGAL_CODE:
  254. error_detail = article.get("msg")
  255. query = f"""
  256. insert ignore into illegal_articles
  257. (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
  258. values
  259. (%s, %s, %s, %s, %s, %s);
  260. """
  261. affected_row = await self.pool.async_save(
  262. query=query,
  263. params=(
  264. gh_id,
  265. account_name,
  266. title,
  267. wx_sn,
  268. publish_date,
  269. error_detail,
  270. ),
  271. )
  272. if affected_row:
  273. if await self.whether_title_unsafe(title):
  274. return
  275. await feishu_robot.bot(
  276. title="文章违规告警",
  277. detail={
  278. "account_name": account_name,
  279. "gh_id": gh_id,
  280. "title": title,
  281. "wx_sn": wx_sn.decode("utf-8"),
  282. "publish_date": str(publish_date),
  283. "error_detail": error_detail,
  284. },
  285. mention=False,
  286. env="prod"
  287. )
  288. await delete_illegal_gzh_articles(gh_id, title)
  289. except Exception as e:
  290. print(f"crawler failed: {article['account_name']}, error: {e}")
  291. async def deal(self):
  292. article_list = await self.fetch_article_list_to_check()
  293. for article in tqdm(article_list, desc="站内文章监测任务"):
  294. await self.check_each_article(article)
  295. return self.TASK_SUCCESS_CODE