outside_gzh_articles_monitor.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import time
  2. import datetime
  3. from tqdm import tqdm
  4. from pymysql.cursors import DictCursor
  5. from applications.api import FeishuBotApi
  6. from applications.db import DatabaseConnector
  7. from cold_start.crawler.wechat import get_article_detail
  8. from cold_start.crawler.wechat import get_article_list_from_account
  9. from config import long_articles_config, denet_config
  10. class Const:
  11. # 文章违规状态
  12. ILLEGAL_STATUS = 1
  13. INIT_STATUS = 0
  14. # 监测周期
  15. MONITOR_CYCLE = 5 * 60 * 60 * 24
  16. # Article Code
  17. ILLEGAL_CODE = 25012
  18. SUCCESS_CODE = 0
  19. class OutsideGzhArticlesManager(Const):
  20. def __init__(self):
  21. self.long_articles_client = DatabaseConnector(long_articles_config)
  22. self.long_articles_client.connect()
  23. self.denet_client = DatabaseConnector(denet_config)
  24. self.denet_client.connect()
  25. self.feishu_bot_api = FeishuBotApi()
  26. def update_article_illegal_status(
  27. self, article_id: int, illegal_reason: str
  28. ) -> None:
  29. update_query = f"""
  30. update outside_gzh_account_monitor
  31. set illegal_status = %s, illegal_reason = %s
  32. where id = %s and illegal_status = %s
  33. """
  34. self.long_articles_client.save(
  35. query=update_query,
  36. params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
  37. )
  38. def whether_published_in_a_week(self, gh_id: str) -> bool:
  39. """
  40. 判断该账号一周内是否有发文,如有,则说无需抓
  41. """
  42. fetch_query = f"""
  43. select id, publish_timestamp from outside_gzh_account_monitor
  44. where gh_id = '{gh_id}'
  45. order by publish_timestamp desc
  46. limit 1;
  47. """
  48. fetch_response = self.long_articles_client.fetch(
  49. query=fetch_query, cursor_type=DictCursor
  50. )
  51. if fetch_response:
  52. publish_timestamp = fetch_response[0]["publish_timestamp"]
  53. if publish_timestamp is None:
  54. return False
  55. else:
  56. return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
  57. else:
  58. return False
  59. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  60. def fetch_outside_account_list(self):
  61. fetch_query = f"""
  62. select
  63. t2.group_source_name as account_source,
  64. t3.name as account_name,
  65. t3.gh_id as gh_id,
  66. t3.status as status
  67. from wx_statistics_group_source t1
  68. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  69. join publish_account t3 on t3.id = t2.account_id
  70. where
  71. t1.mode_type = '代运营服务号';
  72. """
  73. return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
  74. def fetch_each_account(self, account: dict):
  75. gh_id = account["gh_id"]
  76. # 判断该账号本周是否已经发布过
  77. if self.whether_published_in_a_week(gh_id):
  78. return
  79. fetch_response = get_article_list_from_account(gh_id)
  80. try:
  81. msg_list = fetch_response.get("data", {}).get("data", [])
  82. if msg_list:
  83. for msg in tqdm(
  84. msg_list, desc=f"insert account {account['account_name']}"
  85. ):
  86. self.save_each_msg_to_db(msg, account)
  87. else:
  88. print(f"crawler failed: {account['account_name']}")
  89. except Exception as e:
  90. print(
  91. f"crawler failed: account_name: {account['account_name']}\n"
  92. f"error: {e}\n"
  93. )
  94. def save_each_msg_to_db(self, msg: dict, account: dict):
  95. base_info = msg["AppMsg"]["BaseInfo"]
  96. detail_info = msg["AppMsg"]["DetailInfo"]
  97. app_msg_id = base_info["AppMsgId"]
  98. create_timestamp = base_info["CreateTime"]
  99. publish_type = base_info["Type"]
  100. # insert each article
  101. for article in detail_info:
  102. link = article["ContentUrl"]
  103. article_detail = get_article_detail(link)
  104. response_code = article_detail["code"]
  105. if response_code == self.ILLEGAL_CODE:
  106. illegal_reason = article_detail.get("msg")
  107. # bot and return
  108. self.feishu_bot_api.bot(
  109. title="文章违规告警",
  110. detail={
  111. "账号名称": article["account_name"],
  112. "标题": article["title"],
  113. "违规理由": illegal_reason,
  114. "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
  115. "账号合作商": article["account_source"],
  116. },
  117. env="outside_gzh_monitor",
  118. mention=False
  119. )
  120. elif response_code == self.SUCCESS_CODE:
  121. insert_query = f"""
  122. insert ignore into outside_gzh_account_monitor
  123. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  124. channel_content_id, crawler_timestamp, publish_timestamp)
  125. values
  126. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  127. """
  128. self.long_articles_client.save(
  129. query=insert_query,
  130. params=(
  131. account["account_name"],
  132. account["gh_id"],
  133. account["account_source"],
  134. "服务号",
  135. app_msg_id,
  136. publish_type,
  137. article["ItemIndex"],
  138. article["Title"],
  139. link,
  140. article_detail["data"]["data"]["channel_content_id"],
  141. int(time.time()),
  142. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  143. ),
  144. )
  145. else:
  146. continue
  147. def deal(self):
  148. account_list = self.fetch_outside_account_list()
  149. for account in tqdm(account_list):
  150. try:
  151. self.fetch_each_account(account)
  152. except Exception as e:
  153. print(f"crawler failed: {account['account_name']}, error: {e}")
  154. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  155. def fetch_article_list_to_check(self):
  156. publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
  157. fetch_query = f"""
  158. select id, account_name, gh_id, account_source, account_type,
  159. title, link, from_unixtime(publish_timestamp) as publish_date
  160. from outside_gzh_account_monitor
  161. where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
  162. """
  163. return self.long_articles_client.fetch(
  164. query=fetch_query, cursor_type=DictCursor
  165. )
  166. def check_each_article(self, article: dict):
  167. """
  168. check each article
  169. """
  170. link = article["link"]
  171. article_detail = get_article_detail(link)
  172. response_code = article_detail["code"]
  173. if response_code == self.ILLEGAL_CODE:
  174. illegal_reason = article_detail.get("msg")
  175. # illegal_reason = '测试报警功能'
  176. self.feishu_bot_api.bot(
  177. title="文章违规告警",
  178. detail={
  179. "账号名称": article["account_name"],
  180. "标题": article["title"],
  181. "违规理由": illegal_reason,
  182. "发布日期": str(article["publish_date"]),
  183. "账号合作商": article["account_source"],
  184. },
  185. env="outside_gzh_monitor",
  186. mention=False
  187. )
  188. article_id = article["id"]
  189. self.update_article_illegal_status(article_id, illegal_reason)
  190. else:
  191. return
  192. def deal(self):
  193. article_list = self.fetch_article_list_to_check()
  194. for article in tqdm(article_list):
  195. try:
  196. self.check_each_article(article)
  197. except Exception as e:
  198. print(
  199. f"crawler failed: account_name: {article['account_name']}\n"
  200. f"link: {article['link']}\n"
  201. f"title: {article['title']}\n"
  202. f"error: {e}\n"
  203. )