outside_gzh_articles_monitor.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import json
  2. import time
  3. from tqdm import tqdm
  4. from pymysql.cursors import DictCursor
  5. from applications.api import FeishuBotApi
  6. from applications.db import DatabaseConnector
  7. from cold_start.crawler.wechat import get_article_detail
  8. from cold_start.crawler.wechat import get_article_list_from_account
  9. from config import long_articles_config, denet_config
  10. class OutsideGzhArticlesManager:
  11. def __init__(self):
  12. self.long_articles_client = DatabaseConnector(long_articles_config)
  13. self.long_articles_client.connect()
  14. self.denet_client = DatabaseConnector(denet_config)
  15. self.denet_client.connect()
  16. self.feishu_bot_api = FeishuBotApi()
  17. def update_article_illegal_status(self, article_id, illegal_reason):
  18. update_query = f"""
  19. update outside_gzh_account_monitor
  20. set illegal_status = %s, illegal_reason = %s
  21. where id = %s and illegal_reason = %s
  22. """
  23. self.long_articles_client.save(
  24. query=update_query,
  25. params=(1, illegal_reason, article_id, 0)
  26. )
  27. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  28. def fetch_outside_account_list(self):
  29. fetch_query = f"""
  30. select
  31. t2.group_source_name as account_source,
  32. t3.name as account_name,
  33. t3.gh_id as gh_id,
  34. t3.status as status
  35. from wx_statistics_group_source t1
  36. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  37. join publish_account t3 on t3.id = t2.account_id
  38. where
  39. t1.mode_type = '代运营服务号';
  40. """
  41. return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
  42. def fetch_each_account(self, account: dict):
  43. gh_id = account["gh_id"]
  44. fetch_response = get_article_list_from_account(gh_id)
  45. msg_list = fetch_response.get("data", {}).get("data", [])
  46. if msg_list:
  47. for msg in tqdm(msg_list, desc=f"insert account {account['account_name']}"):
  48. self.save_each_msg_to_db(msg, account)
  49. else:
  50. print(f"crawler failed: {account['account_name']}")
  51. def save_each_msg_to_db(self, msg: dict, account: dict):
  52. base_info = msg["AppMsg"]["BaseInfo"]
  53. detail_info = msg["AppMsg"]["DetailInfo"]
  54. app_msg_id = base_info["AppMsgId"]
  55. create_timestamp = base_info["CreateTime"]
  56. publish_type = base_info["Type"]
  57. # insert each article
  58. for article in detail_info:
  59. link = article["ContentUrl"]
  60. article_detail = get_article_detail(link)
  61. response_code = article_detail["code"]
  62. if response_code == 25012:
  63. illegal_reason = article_detail.get("msg")
  64. # bot and return
  65. self.feishu_bot_api.bot(
  66. title="文章违规告警",
  67. detail={
  68. "account_name": article["account_name"],
  69. "title": article['title'],
  70. "reason": illegal_reason,
  71. "publish_timestamp": create_timestamp,
  72. "account_source": article["account_source"]
  73. },
  74. env="dev"
  75. )
  76. elif response_code == 0:
  77. insert_query = f"""
  78. insert ignore into outside_gzh_account_monitor
  79. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  80. channel_content_id, crawler_timestamp, publish_timestamp)
  81. values
  82. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  83. """
  84. self.long_articles_client.save(
  85. query=insert_query,
  86. params=(
  87. account["account_name"],
  88. account["gh_id"],
  89. account["account_source"],
  90. "服务号",
  91. app_msg_id,
  92. publish_type,
  93. article["ItemIndex"],
  94. article["Title"],
  95. link,
  96. article_detail["data"]["data"]["channel_content_id"],
  97. int(time.time()),
  98. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  99. ),
  100. )
  101. else:
  102. continue
  103. def deal(self):
  104. account_list = self.fetch_outside_account_list()
  105. for account in tqdm(account_list[:10]):
  106. self.fetch_each_account(account)
  107. class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
  108. def fetch_article_list_to_check(self):
  109. publish_timestamp_threshold = int(time.time()) - 7 * 24 * 3600
  110. fetch_query = f"""
  111. select id, account_name, gh_id, account_source, account_type,
  112. title, link, from_unixtime(publish_timestamp) as publish_date
  113. from outside_gzh_account_monitor
  114. where illegal_status = 0 and publish_timestamp > {publish_timestamp_threshold};
  115. """
  116. return self.long_articles_client.fetch(
  117. query=fetch_query, cursor_type=DictCursor
  118. )
  119. def check_each_article(self, article: dict):
  120. """
  121. check each article
  122. """
  123. link = article["link"]
  124. article_detail = get_article_detail(link)
  125. response_code = article_detail["code"]
  126. if response_code == 25012:
  127. illegal_reason = article_detail.get("msg")
  128. self.feishu_bot_api.bot(
  129. title="文章违规告警",
  130. detail={
  131. "account_name": article["account_name"],
  132. "title": article['title'],
  133. "reason": illegal_reason,
  134. "publish_date": article["publish_date"],
  135. "account_source": article["account_source"]
  136. },
  137. env="dev"
  138. )
  139. article_id = article["id"]
  140. self.update_article_illegal_status(article_id, illegal_reason)
  141. else:
  142. return
  143. def deal(self):
  144. article_list = self.fetch_article_list_to_check()
  145. for article in tqdm(article_list):
  146. self.check_each_article(article)
  147. if __name__ == "__main__":
  148. # collector = OutsideGzhArticlesCollector()
  149. # collector.deal()
  150. monitor = OutsideGzhArticlesMonitor()
  151. monitor.deal()