published_articles_monitor.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from datetime import datetime
  2. from argparse import ArgumentParser
  3. from concurrent.futures import ThreadPoolExecutor
  4. from tqdm import tqdm
  5. from applications.db import DatabaseConnector
  6. from applications.const import updatePublishedMsgTaskConst
  7. from applications import WeixinSpider
  8. from config import piaoquan_crawler_config, long_articles_config
  9. const = updatePublishedMsgTaskConst()
  10. spider = WeixinSpider()
  11. def monitor_article(article):
  12. """
  13. 校验单篇文章是否
  14. """
  15. gh_id, account_name, title, url, wx_sn, publish_date = article
  16. try:
  17. response = spider.get_article_text(url, is_cache=False)
  18. response_code = response["code"]
  19. if response_code == const.ARTICLE_ILLEGAL_CODE:
  20. error_detail = response.get("msg")
  21. insert_sql = f"""
  22. INSERT IGNORE INTO illegal_articles
  23. (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
  24. VALUES
  25. (%s, %s, %s, %s, %s, %s);
  26. """
  27. long_articles_db_client.save(
  28. query=insert_sql,
  29. params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
  30. )
  31. except Exception as e:
  32. print(e)
  33. def get_article_list(run_date):
  34. """
  35. 监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
  36. :return:
  37. """
  38. if not run_date:
  39. run_date = datetime.today().strftime("%Y-%m-%d")
  40. monitor_start_timestamp = (
  41. int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
  42. )
  43. select_sql = f"""
  44. SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
  45. FROM official_articles_v2
  46. WHERE publish_timestamp >= {monitor_start_timestamp}
  47. ORDER BY publish_timestamp DESC;
  48. """
  49. article_list = piaoquan_crawler_db_client.fetch(select_sql)
  50. return article_list
  51. if __name__ == "__main__":
  52. parser = ArgumentParser()
  53. parser.add_argument(
  54. "--run_date",
  55. help="--run_date %Y-%m-%d",
  56. )
  57. args = parser.parse_args()
  58. if args.run_date:
  59. run_date = args.run_date
  60. else:
  61. run_date = None
  62. piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
  63. piaoquan_crawler_db_client.connect()
  64. long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
  65. long_articles_db_client.connect()
  66. # Number of concurrent threads
  67. MAX_WORKERS = 4
  68. article_list = get_article_list(run_date=None)
  69. with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  70. list(
  71. tqdm(
  72. executor.map(monitor_article, article_list),
  73. total=len(article_list),
  74. desc="Monitor Article List",
  75. )
  76. )