| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from tqdm import tqdm
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import GzhArticleMonitorConst
- from ._mapper import GzhArticleMonitorMapper
- from ._utils import GzhArticleMonitorUtils
- class InnerArticleMonitorTask(GzhArticleMonitorConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.aliyun_log = log_service
- self.mapper = GzhArticleMonitorMapper(pool)
- self.tool = GzhArticleMonitorUtils()
- async def check_each_article(self, article):
- gh_id, account_name, title, url, wx_sn, publish_date = (
- article["ghId"],
- article["accountName"],
- article["title"],
- article["ContentUrl"],
- article["wx_sn"],
- article["publish_timestamp"],
- )
- try:
- response = await self.tool.get_article_detail_by_link(link=url)
- response_code = response.get("code")
- match response_code:
- case self.ArticleCode.ILLEGAL:
- error_detail = response.get("msg")
- article_detail = {
- "account_name": account_name,
- "gh_id": gh_id,
- "title": title,
- "wx_sn": wx_sn.decode("utf-8"),
- "publish_date": str(publish_date),
- "error_detail": error_detail,
- }
- if error_detail == self.ARTICLE_MODE_ERROR:
- # 判断该文章是否处理过
- if await self.mapper.fetch_mode_error_wx_sn(
- wx_sn=wx_sn.decode("utf-8")
- ):
- return
- await self.mapper.save_mode_error_article(
- article=(wx_sn.decode("utf-8"), title, error_detail)
- )
- await self.tool.feishu_alert(
- title="长文——模式违规报警", detail=article_detail, mention=True
- )
- return
- insert_row = await self.mapper.insert_into_illegal_articles(
- params=(
- gh_id,
- account_name,
- title,
- wx_sn,
- publish_date,
- error_detail,
- )
- )
- if insert_row:
- # illegal_articles 里面没有违规文章、首次出现报警
- await self.tool.feishu_alert(
- title="长文——文章违规告警", detail=article_detail, mention=True
- )
- # 判断文章是否删过
- title_md5 = self.tool.title_to_md5(title)
- if await self.mapper.whether_title_unsafe(title_md5=title_md5):
- # 说明文章已经删过,无需再删文处理
- return
- await self.tool.delete_illegal_articles(gh_id, title)
- case _:
- pass
- except Exception as e:
- await self.aliyun_log.log(
- contents={
- "task": "check_illegal_articles",
- "status": "fail",
- "link": url,
- "account_name": account_name,
- "wx_sn": wx_sn.decode("utf-8"),
- "error": str(e),
- }
- )
- async def deal(self, run_date=None):
- start_timestamp = self.tool.get_start_timestamp(run_date)
- article_list = await self.mapper.fetch_article_list_to_check(
- start_timestamp=start_timestamp
- )
- if not article_list:
- await self.tool.feishu_alert(
- title="站内微信公众号发文监测任务异常",
- detail={"message": "获取待 check 文章列表失败"},
- mention=True,
- )
- for article in tqdm(article_list, desc="站内文章检测任务"):
- await self.check_each_article(article)
- return self.TaskStatus.SUCCESS
- __all__ = ["InnerArticleMonitorTask"]
|