outside_gzh_articles_monitor.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import json
  2. import time
  3. from tqdm import tqdm
  4. from pymysql.cursors import DictCursor
  5. from applications.api import FeishuBotApi
  6. from applications.db import DatabaseConnector
  7. from cold_start.crawler.wechat import get_article_detail
  8. from cold_start.crawler.wechat import get_article_list_from_account
  9. from config import long_articles_config, denet_config
  10. class OutsideGzhArticlesManager:
  11. def __init__(self):
  12. self.long_articles_client = DatabaseConnector(long_articles_config)
  13. self.long_articles_client.connect()
  14. self.denet_client = DatabaseConnector(denet_config)
  15. self.denet_client.connect()
  16. self.feishu_bot_api = FeishuBotApi()
  17. def process_illegal_article(
  18. self, account_name, title, reason, publish_timestamp, account_source
  19. ):
  20. self.feishu_bot_api.bot(
  21. title="文章违规告警",
  22. detail={
  23. "account_name": account_name,
  24. "title": title,
  25. "reason": reason,
  26. "publish_timestamp": publish_timestamp,
  27. "account_source": account_source,
  28. },
  29. env="dev"
  30. )
  31. return
  32. class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
  33. def fetch_outside_account_list(self):
  34. fetch_query = f"""
  35. select
  36. t2.group_source_name as account_source,
  37. t3.name as account_name,
  38. t3.gh_id as gh_id,
  39. t3.status as status
  40. from wx_statistics_group_source t1
  41. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  42. join publish_account t3 on t3.id = t2.account_id
  43. where
  44. t1.mode_type = '代运营服务号';
  45. """
  46. return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
  47. def fetch_each_account(self, account: dict):
  48. gh_id = account["gh_id"]
  49. fetch_response = get_article_list_from_account(gh_id)
  50. msg_list = fetch_response.get("data", {}).get("data", [])
  51. if msg_list:
  52. for msg in msg_list[:1]:
  53. self.save_each_msg_to_db(msg, account)
  54. else:
  55. print(f"crawler failed: {account['account_name']}")
  56. def save_each_msg_to_db(self, msg: dict, account: dict):
  57. base_info = msg["AppMsg"]["BaseInfo"]
  58. detail_info = msg["AppMsg"]["DetailInfo"]
  59. app_msg_id = base_info["AppMsgId"]
  60. create_timestamp = base_info["CreateTime"]
  61. update_timestamp = base_info["UpdateTime"]
  62. publish_type = base_info["Type"]
  63. # insert each article
  64. for article in detail_info[:1]:
  65. link = article["ContentUrl"]
  66. article_detail = get_article_detail(link)
  67. response_code = article_detail["code"]
  68. if response_code == 25012:
  69. illegal_reason = article_detail.get("msg")
  70. # bot and return
  71. elif response_code == 0:
  72. insert_query = f"""
  73. insert into outside_gzh_account_monitor
  74. (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
  75. channel_content_id, crawler_timestamp, publish_timestamp)
  76. values
  77. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  78. """
  79. self.long_articles_client.save(
  80. query=insert_query,
  81. params=(
  82. account["account_name"],
  83. account["gh_id"],
  84. account["account_source"],
  85. "服务号",
  86. app_msg_id,
  87. publish_type,
  88. article["ItemIndex"],
  89. article["Title"],
  90. link,
  91. article_detail["data"]["data"]["channel_content_id"],
  92. create_timestamp,
  93. update_timestamp,
  94. int(time.time()),
  95. int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
  96. ),
  97. )
  98. else:
  99. continue
  100. class OutsideGzhArticlesMonitor(OutsideGzhArticlesCollector):
  101. def fetch_article_list_to_check(self):
  102. publish_timestamp_threshold = int(time.time()) - 7 * 24 * 3600
  103. fetch_query = f"""
  104. select account_name, gh_id, account_source, account_type, title, link
  105. from outside_gzh_account_monitor
  106. where illegal_status = 0 and publish_timestamp > {publish_timestamp_threshold};
  107. """
  108. return self.long_articles_client.fetch(
  109. query=fetch_query, cursor_type=DictCursor
  110. )
  111. def check_each_article(self, article: dict):
  112. """
  113. check each article
  114. """
  115. link = article["link"]
  116. article_detail = get_article_detail(link)
  117. response_code = article_detail["code"]
  118. if response_code == 25012:
  119. illegal_reason = article_detail.get("msg")
  120. self.process_illegal_article(
  121. account_name=article["account_name"],
  122. title=article["title"],
  123. reason=illegal_reason,
  124. publish_timestamp=article["publish_timestamp"],
  125. account_source=article["account_source"],
  126. )
  127. else:
  128. return
  129. def deal(self):
  130. article_list = self.fetch_article_list_to_check()
  131. for article in tqdm(article_list):
  132. self.check_each_article(article)
  133. if __name__ == "__main__":
  134. collector = OutsideGzhArticlesCollector()
  135. accounts = collector.fetch_outside_account_list()
  136. for account_ in tqdm(accounts[1:2]):
  137. collector.fetch_each_account(account_)