from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import GzhArticleMonitorConst from ._mapper import GzhArticleMonitorMapper from ._utils import GzhArticleMonitorUtils class InnerArticleMonitorTask(GzhArticleMonitorConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.aliyun_log = log_service self.mapper = GzhArticleMonitorMapper(pool) self.tool = GzhArticleMonitorUtils() async def check_each_article(self, article): gh_id, account_name, title, url, wx_sn, publish_date = ( article["ghId"], article["accountName"], article["title"], article["ContentUrl"], article["wx_sn"], article["publish_timestamp"], ) try: response = await self.tool.get_article_detail_by_link(link=url) response_code = response.get("code") match response_code: case self.ArticleCode.ILLEGAL: error_detail = response.get("msg") article_detail = { "account_name": account_name, "gh_id": gh_id, "title": title, "wx_sn": wx_sn.decode("utf-8"), "publish_date": str(publish_date), "error_detail": error_detail, } if error_detail == self.ARTICLE_MODE_ERROR: # 判断该文章是否处理过 if await self.mapper.fetch_mode_error_wx_sn( wx_sn=wx_sn.decode("utf-8") ): return await self.mapper.save_mode_error_article( article=(wx_sn.decode("utf-8"), title, error_detail) ) await self.tool.feishu_alert( title="长文——模式违规报警", detail=article_detail, mention=True ) return insert_row = await self.mapper.insert_into_illegal_articles( params=( gh_id, account_name, title, wx_sn, publish_date, error_detail, ) ) if insert_row: # illegal_articles 里面没有违规文章、首次出现报警 await self.tool.feishu_alert( title="长文——文章违规告警", detail=article_detail, mention=True ) # 判断文章是否删过 title_md5 = self.tool.title_to_md5(title) if await self.mapper.whether_title_unsafe(title_md5=title_md5): # 说明文章已经删过,无需再删文处理 return await self.tool.delete_illegal_articles(gh_id, title) case _: pass except Exception as e: await self.aliyun_log.log( contents={ "task": "check_illegal_articles", "status": "fail", "link": url, "account_name": account_name, "wx_sn": wx_sn.decode("utf-8"), "error": str(e), } ) async def deal(self, run_date=None): start_timestamp = self.tool.get_start_timestamp(run_date) article_list = await self.mapper.fetch_article_list_to_check( start_timestamp=start_timestamp ) if not article_list: await self.tool.feishu_alert( title="站内微信公众号发文监测任务异常", detail={"message": "获取待 check 文章列表失败"}, mention=True, ) for article in tqdm(article_list, desc="站内文章检测任务"): await self.check_each_article(article) return self.TaskStatus.SUCCESS __all__ = ["InnerArticleMonitorTask"]