entrance.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from tqdm import tqdm
  2. from app.core.database import DatabaseManager
  3. from app.core.observability import LogService
  4. from ._const import GzhArticleMonitorConst
  5. from ._mapper import GzhArticleMonitorMapper
  6. from ._utils import GzhArticleMonitorUtils
  7. class InnerArticleMonitorTask(GzhArticleMonitorConst):
  8. def __init__(self, pool: DatabaseManager, log_service: LogService):
  9. self.pool = pool
  10. self.aliyun_log = log_service
  11. self.mapper = GzhArticleMonitorMapper(pool)
  12. self.tool = GzhArticleMonitorUtils()
  13. async def check_each_article(self, article):
  14. gh_id, account_name, title, url, wx_sn, publish_date = (
  15. article["ghId"],
  16. article["accountName"],
  17. article["title"],
  18. article["ContentUrl"],
  19. article["wx_sn"],
  20. article["publish_timestamp"],
  21. )
  22. try:
  23. response = await self.tool.get_article_detail_by_link(link=url)
  24. response_code = response.get("code")
  25. match response_code:
  26. case self.ArticleCode.ILLEGAL:
  27. error_detail = response.get("msg")
  28. article_detail = {
  29. "account_name": account_name,
  30. "gh_id": gh_id,
  31. "title": title,
  32. "wx_sn": wx_sn.decode("utf-8"),
  33. "publish_date": str(publish_date),
  34. "error_detail": error_detail,
  35. }
  36. if error_detail == self.ARTICLE_MODE_ERROR:
  37. # 判断该文章是否处理过
  38. if await self.mapper.fetch_mode_error_wx_sn(
  39. wx_sn=wx_sn.decode("utf-8")
  40. ):
  41. return
  42. await self.mapper.save_mode_error_article(
  43. article=(wx_sn.decode("utf-8"), title, error_detail)
  44. )
  45. await self.tool.feishu_alert(
  46. title="长文——模式违规报警", detail=article_detail, mention=True
  47. )
  48. return
  49. insert_row = await self.mapper.insert_into_illegal_articles(
  50. params=(
  51. gh_id,
  52. account_name,
  53. title,
  54. wx_sn,
  55. publish_date,
  56. error_detail,
  57. )
  58. )
  59. if insert_row:
  60. # illegal_articles 里面没有违规文章、首次出现报警
  61. await self.tool.feishu_alert(
  62. title="长文——文章违规告警", detail=article_detail, mention=True
  63. )
  64. # 判断文章是否删过
  65. title_md5 = self.tool.title_to_md5(title)
  66. if await self.mapper.whether_title_unsafe(title_md5=title_md5):
  67. # 说明文章已经删过,无需再删文处理
  68. return
  69. await self.tool.delete_illegal_articles(gh_id, title)
  70. case _:
  71. pass
  72. except Exception as e:
  73. await self.aliyun_log.log(
  74. contents={
  75. "task": "check_illegal_articles",
  76. "status": "fail",
  77. "link": url,
  78. "account_name": account_name,
  79. "wx_sn": wx_sn.decode("utf-8"),
  80. "error": str(e),
  81. }
  82. )
  83. async def deal(self, run_date=None):
  84. start_timestamp = self.tool.get_start_timestamp(run_date)
  85. article_list = await self.mapper.fetch_article_list_to_check(
  86. start_timestamp=start_timestamp
  87. )
  88. if not article_list:
  89. await self.tool.feishu_alert(
  90. title="站内微信公众号发文监测任务异常",
  91. detail={"message": "获取待 check 文章列表失败"},
  92. mention=True,
  93. )
  94. for article in tqdm(article_list, desc="站内文章检测任务"):
  95. await self.check_each_article(article)
  96. return self.TaskStatus.SUCCESS
  97. __all__ = ["InnerArticleMonitorTask"]