published_articles_monitor.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """
  2. 监测已发布文章
  3. """
  4. from datetime import datetime
  5. from argparse import ArgumentParser
  6. from concurrent.futures import ThreadPoolExecutor
  7. from tqdm import tqdm
  8. from applications import bot
  9. from applications.db import DatabaseConnector
  10. from applications.const import updatePublishedMsgTaskConst
  11. from applications import WeixinSpider
  12. from config import piaoquan_crawler_config, long_articles_config
  13. const = updatePublishedMsgTaskConst()
  14. spider = WeixinSpider()
  15. def monitor_article(article):
  16. """
  17. 校验单篇文章是否
  18. """
  19. gh_id, account_name, title, url, wx_sn, publish_date = article
  20. try:
  21. response = spider.get_article_text(url, is_cache=False)
  22. response_code = response["code"]
  23. if response_code == const.ARTICLE_ILLEGAL_CODE:
  24. error_detail = response.get("msg")
  25. insert_sql = f"""
  26. INSERT IGNORE INTO illegal_articles
  27. (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
  28. VALUES
  29. (%s, %s, %s, %s, %s, %s);
  30. """
  31. affected_rows = long_articles_db_client.save(
  32. query=insert_sql,
  33. params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
  34. )
  35. if affected_rows:
  36. bot(
  37. title="文章违规告警",
  38. detail={
  39. "account_name": account_name,
  40. "gh_id": gh_id,
  41. "title": title,
  42. "wx_sn": wx_sn.decode("utf-8"),
  43. "publish_date": str(publish_date),
  44. "error_detail": error_detail,
  45. }
  46. )
  47. except Exception as e:
  48. print(e)
  49. def get_article_list(run_date):
  50. """
  51. 监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
  52. :return:
  53. """
  54. if not run_date:
  55. run_date = datetime.today().strftime("%Y-%m-%d")
  56. monitor_start_timestamp = (
  57. int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
  58. )
  59. select_sql = f"""
  60. SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
  61. FROM official_articles_v2
  62. WHERE publish_timestamp >= {monitor_start_timestamp}
  63. ORDER BY publish_timestamp DESC;
  64. """
  65. article_list = piaoquan_crawler_db_client.fetch(select_sql)
  66. return article_list
  67. if __name__ == "__main__":
  68. parser = ArgumentParser()
  69. parser.add_argument(
  70. "--run_date",
  71. help="--run_date %Y-%m-%d",
  72. )
  73. args = parser.parse_args()
  74. if args.run_date:
  75. run_date = args.run_date
  76. else:
  77. run_date = None
  78. piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
  79. piaoquan_crawler_db_client.connect()
  80. long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
  81. long_articles_db_client.connect()
  82. # Number of concurrent threads
  83. MAX_WORKERS = 4
  84. article_list = get_article_list(run_date=None)
  85. with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  86. list(
  87. tqdm(
  88. executor.map(monitor_article, article_list),
  89. total=len(article_list),
  90. desc="Monitor Article List",
  91. )
  92. )