luojunhui 2 недель назад
Родитель
Сommit
7a09c304c8

+ 2 - 2
app/domains/monitor_tasks/__init__.py

@@ -3,7 +3,7 @@ from .get_off_videos import GetOffVideos
 from .get_off_videos import CheckVideoAuditStatus
 from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
-from .gzh_article_monitor import InnerGzhArticlesMonitor
+from .gzh_article_monitor import InnerArticleMonitorTask
 from .limited_account_analysis import LimitedAccountAnalysisTask
 from .task_processing_monitor import TaskProcessingMonitor
 from .auto_reply_cards_monitor import AutoReplyCardsMonitor
@@ -18,7 +18,7 @@ __all__ = [
     "CheckVideoAuditStatus",
     "OutsideGzhArticlesMonitor",
     "OutsideGzhArticlesCollector",
-    "InnerGzhArticlesMonitor",
+    "InnerArticleMonitorTask",
     "TaskProcessingMonitor",
     "LimitedAccountAnalysisTask",
     "AutoReplyCardsMonitor",

+ 10 - 0
app/domains/monitor_tasks/gzh_article_monitor/__init__.py

@@ -0,0 +1,10 @@
+from .entrance import InnerArticleMonitorTask
+from .outside import OutsideGzhArticlesCollector
+from .outside import OutsideGzhArticlesMonitor
+
+
+__all__ = [
+    "InnerArticleMonitorTask",
+    "OutsideGzhArticlesCollector",
+    "OutsideGzhArticlesMonitor",
+]

+ 29 - 0
app/domains/monitor_tasks/gzh_article_monitor/_const.py

@@ -0,0 +1,29 @@
+class GzhArticleMonitorConst:
+    # 监测周期
+    MONITOR_CYCLE = 2 * 24 * 3600
+
+    # 模式违规
+    ARTICLE_MODE_ERROR = "此内容因违规无法查看: 此内容因违规无法查看 由用户投诉并经平台审核,存在非真人自动化创作行为,查看对应规则 微信公众平台运营中心"
+
+    class DeleteFlag:
+        ILLEGAL = 0
+        HTML_MANUL = 1
+        RATE_LIMITED = 2
+
+    # 文章违规状态
+    class ArticleIllegalStatus:
+        ILLEGAL = 1
+        INIT = 0
+
+    # article code
+    class ArticleCode:
+        ILLEGAL = 25012
+        SELF_DELETE = 25005
+        SUCCESS = 0
+        UNKNOWN = 10000
+
+    class TaskStatus:
+        INIT = 0
+        PROCESSING = 1
+        SUCCESS = 2
+        FAIL = 99

+ 58 - 0
app/domains/monitor_tasks/gzh_article_monitor/_mapper.py

@@ -0,0 +1,58 @@
+from app.core.database import DatabaseManager
+
+from ._const import GzhArticleMonitorConst
+
+
+class GzhArticleMonitorMapper(GzhArticleMonitorConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def whether_title_unsafe(self, title_md5: str) -> bool:
+        """
+        :param title_md5: 文章标题的 md5 值
+        :return: bool
+        """
+        query = """
+            select title_md5 from article_unsafe_title where title_md5 = %s;
+        """
+        response = await self.pool.async_fetch(query=query, params=(title_md5,))
+        return True if response else False
+
+    async def fetch_article_list_to_check(self, start_timestamp: int):
+        """
+        :param start_timestamp: 最早发文时间戳,单位秒
+        """
+        query = """
+            select ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) as publish_timestamp
+            from official_articles_v2
+            where publish_timestamp >= %s
+            order by publish_timestamp desc;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(start_timestamp,)
+        )
+
+    async def insert_into_illegal_articles(self, params: tuple) -> int:
+        query = """
+            INSERT IGNORE INTO illegal_articles
+                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+            VALUES 
+                (%s, %s, %s, %s, %s, %s); 
+        """
+        return await self.pool.async_save(query=query, params=params)
+
+    async def fetch_mode_error_wx_sn(self, wx_sn: str):
+        query = """
+            SELECT wx_sn FROM publish_mode_error_articles WHERE wx_sn = %s;
+        """
+        response = await self.pool.async_fetch(query=query, params=(wx_sn,))
+        return True if response else False
+
+    async def save_mode_error_article(self, article: tuple):
+        query = """
+            INSERT IGNORE INTO publish_mode_error_articles
+                (wx_sn, title, error)
+            VALUES 
+                (%s, %s, %s);
+        """
+        return await self.pool.async_save(query=query, params=article)

+ 48 - 0
app/domains/monitor_tasks/gzh_article_monitor/_utils.py

@@ -0,0 +1,48 @@
+import datetime
+
+from typing import Dict
+
+from app.infra.external import feishu_robot
+from app.infra.internal import delete_illegal_gzh_articles
+from app.infra.crawler.wechat import get_article_detail
+from app.infra.crawler.wechat import get_article_list_from_account
+from app.infra.shared.tools import str_to_md5
+
+from ._const import GzhArticleMonitorConst
+
+
+class GzhArticleMonitorUtils(GzhArticleMonitorConst):
+    @staticmethod
+    def title_to_md5(title: str):
+        return str_to_md5(title)
+
+    def get_start_timestamp(self, run_date: str = None) -> int:
+        if not run_date:
+            run_date = datetime.datetime.today().strftime("%Y-%m-%d")
+
+        return (
+            int(datetime.datetime.strptime(run_date, "%Y-%m-%d").timestamp())
+            - self.MONITOR_CYCLE
+        )
+
+    @staticmethod
+    async def feishu_alert(title, detail: Dict, mention: bool = False):
+        return await feishu_robot.bot(
+            title=title, detail=detail, mention=mention, env="prod"
+        )
+
+    async def delete_illegal_articles(self, gh_id, title):
+        return await delete_illegal_gzh_articles(
+            gh_id, title, delete_flag=self.DeleteFlag.ILLEGAL
+        )
+
+    @staticmethod
+    async def get_article_detail_by_link(link: str):
+        return await get_article_detail(link, is_cache=False)
+
+    @staticmethod
+    async def get_gzh_article_list(gh_id: str):
+        if not gh_id.startswith("gh_"):
+            return None
+
+        return await get_article_list_from_account(gh_id)

+ 112 - 0
app/domains/monitor_tasks/gzh_article_monitor/entrance.py

@@ -0,0 +1,112 @@
+from tqdm import tqdm
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import GzhArticleMonitorConst
+from ._mapper import GzhArticleMonitorMapper
+from ._utils import GzhArticleMonitorUtils
+
+
+class InnerArticleMonitorTask(GzhArticleMonitorConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.aliyun_log = log_service
+        self.mapper = GzhArticleMonitorMapper(pool)
+        self.tool = GzhArticleMonitorUtils()
+
+    async def check_each_article(self, article):
+        gh_id, account_name, title, url, wx_sn, publish_date = (
+            article["ghId"],
+            article["accountName"],
+            article["title"],
+            article["ContentUrl"],
+            article["wx_sn"],
+            article["publish_timestamp"],
+        )
+        try:
+            response = await self.tool.get_article_detail_by_link(link=url)
+            response_code = response.get("code")
+            match response_code:
+                case self.ArticleCode.ILLEGAL:
+                    error_detail = response.get("msg")
+                    article_detail = {
+                        "account_name": account_name,
+                        "gh_id": gh_id,
+                        "title": title,
+                        "wx_sn": wx_sn.decode("utf-8"),
+                        "publish_date": str(publish_date),
+                        "error_detail": error_detail,
+                    }
+                    if error_detail == self.ARTICLE_MODE_ERROR:
+                        # 判断该文章是否处理过
+                        if await self.mapper.fetch_mode_error_wx_sn(
+                            wx_sn=wx_sn.decode("utf-8")
+                        ):
+                            return
+
+                        await self.mapper.save_mode_error_article(
+                            article=(wx_sn.decode("utf-8"), title, error_detail)
+                        )
+                        await self.tool.feishu_alert(
+                            title="模式违规报警", detail=article_detail, mention=False
+                        )
+                        return
+
+                    insert_row = await self.mapper.insert_into_illegal_articles(
+                        params=(
+                            gh_id,
+                            account_name,
+                            title,
+                            wx_sn,
+                            publish_date,
+                            error_detail,
+                        )
+                    )
+
+                    if insert_row:
+                        # 判断文章是否删过
+                        title_md5 = self.tool.title_to_md5(title)
+                        if await self.mapper.whether_title_unsafe(title_md5=title_md5):
+                            # 说明文章已经删过,无需处理和报警
+                            return
+
+                        await self.tool.feishu_alert(
+                            title="文章违规告警", detail=article_detail, mention=False
+                        )
+                        await self.tool.delete_illegal_articles(gh_id, title)
+
+                case _:
+                    pass
+
+        except Exception as e:
+            await self.aliyun_log.log(
+                contents={
+                    "task": "check_illegal_articles",
+                    "status": "fail",
+                    "link": url,
+                    "account_name": account_name,
+                    "wx_sn": wx_sn.decode("utf-8"),
+                    "error": str(e),
+                }
+            )
+
+    async def deal(self, run_date=None):
+        start_timestamp = self.tool.get_start_timestamp(run_date)
+        article_list = await self.mapper.fetch_article_list_to_check(
+            start_timestamp=start_timestamp
+        )
+        if not article_list:
+            await self.tool.feishu_alert(
+                title="站内微信公众号发文监测任务异常",
+                detail={"message": "获取待 check 文章列表失败"},
+                mention=True,
+            )
+
+        for article in tqdm(article_list, desc="站内文章检测任务"):
+            await self.check_each_article(article)
+
+        return self.TaskStatus.SUCCESS
+
+
+__all__ = ["InnerArticleMonitorTask"]

+ 23 - 11
app/domains/monitor_tasks/gzh_article_monitor.py → app/domains/monitor_tasks/gzh_article_monitor/outside.py

@@ -290,9 +290,6 @@ class InnerGzhArticlesMonitor(MonitorConst):
             response_code = response["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 error_detail = response.get("msg")
-                if error_detail == '此内容因违规无法查看: 此内容因违规无法查看 由用户投诉并经平台审核,存在非真人自动化创作行为,查看对应规则 微信公众平台运营中心':
-                    return
-
                 query = """
                     INSERT IGNORE INTO illegal_articles
                         (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
@@ -311,19 +308,34 @@ class InnerGzhArticlesMonitor(MonitorConst):
                     ),
                 )
                 if affected_row:
+                    article_detail = {
+                        "account_name": account_name,
+                        "gh_id": gh_id,
+                        "title": title,
+                        "wx_sn": wx_sn.decode("utf-8"),
+                        "publish_date": str(publish_date),
+                        "error_detail": error_detail,
+                    }
+                    if (
+                        error_detail
+                        == "此内容因违规无法查看: 此内容因违规无法查看 由用户投诉并经平台审核,存在非真人自动化创作行为,查看对应规则 微信公众平台运营中心"
+                    ):
+                        # 模式违规而不是文章违规,只报警,不处理
+                        await feishu_robot.bot(
+                            title="模式违规报警",
+                            detail=article_detail,
+                            mention=False,
+                            env="prod",
+                        )
+                        return
+
                     if await self.whether_title_unsafe(title):
+                        # 说明文章已经删除过,无需处理和报警
                         return
 
                     await feishu_robot.bot(
                         title="文章违规告警",
-                        detail={
-                            "account_name": account_name,
-                            "gh_id": gh_id,
-                            "title": title,
-                            "wx_sn": wx_sn.decode("utf-8"),
-                            "publish_date": str(publish_date),
-                            "error_detail": error_detail,
-                        },
+                        detail=article_detail,
                         mention=False,
                         env="prod",
                     )

+ 2 - 2
app/jobs/domains/monitor_task.py

@@ -3,7 +3,7 @@ from app.domains.monitor_tasks import check_kimi_balance
 from app.domains.monitor_tasks import GetOffVideos
 from app.domains.monitor_tasks import CheckVideoAuditStatus
 from app.domains.monitor_tasks import CooperateAccountsMonitorTask
-from app.domains.monitor_tasks import InnerGzhArticlesMonitor
+from app.domains.monitor_tasks import InnerArticleMonitorTask
 from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
 from app.domains.monitor_tasks import OutsideGzhArticlesCollector
 from app.domains.monitor_tasks import TaskProcessingMonitor
@@ -18,7 +18,7 @@ __all__ = [
     "GetOffVideos",
     "CheckVideoAuditStatus",
     "CooperateAccountsMonitorTask",
-    "InnerGzhArticlesMonitor",
+    "InnerArticleMonitorTask",
     "OutsideGzhArticlesMonitor",
     "OutsideGzhArticlesCollector",
     "TaskProcessingMonitor",

+ 1 - 1
app/jobs/task_handler.py

@@ -97,7 +97,7 @@ class TaskHandler:
     @register("inner_article_monitor")
     async def _inner_gzh_articles_monitor_handler(self) -> int:
         """内部公众号文章监控"""
-        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        sub_task = InnerArticleMonitorTask(self.db_client)
         return await sub_task.deal()
 
     @register("outside_article_monitor")