Jelajahi Sumber

save-daily-rank-log

luojunhui 3 minggu lalu
induk
melakukan
9f44b80923

+ 1 - 1
app/domains/analysis_task/rate_limited_article_filter.py

@@ -84,7 +84,7 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
                 article_tuple=(title_md5, title, remark)
             )
             if insert_rows:
-                await delete_illegal_gzh_articles(gh_id=gh_id, title=title)
+                await delete_illegal_gzh_articles(gh_id=gh_id, title=title, delete_flag=2)
             else:
                 print("该文章已经删过")
 

+ 4 - 0
app/domains/recommend/i2i_recommend/__init__.py

@@ -0,0 +1,4 @@
+from .entrance import I2IRecommendDataSyncTask
+
+
+__all__ = ["I2IRecommendDataSyncTask"]

+ 14 - 0
app/domains/recommend/i2i_recommend/_const.py

@@ -0,0 +1,14 @@
+class I2IRecommendDataSyncConst:
+
+    RANK_BOT = "rank_bot"
+
+    class TaskStatus:
+        INIT = 0
+        RUNNING = 1
+        SUCCESS = 2
+        FAILED = 99
+
+    class VersionStatus:
+        INIT = 0
+        ONLINE = 1
+        ARCHIVED = 2

+ 70 - 0
app/domains/recommend/i2i_recommend/_mapper.py

@@ -0,0 +1,70 @@
+from app.core.database import DatabaseManager
+
+from ._const import I2IRecommendDataSyncConst
+
+
+class I2IRecommendDataSyncMapper(I2IRecommendDataSyncConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def fetch_all_accounts(self):
+        """查询有待激活数据的账号"""
+        query = """
+            SELECT gh_id FROM i2i_recommend WHERE status = %s GROUP BY gh_id;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(self.VersionStatus.INIT,),
+        )
+
+    async def fetch_max_version_for_account(self, gh_id: str):
+        """
+        查询单个账号的最大version及其status
+        返回: {max_version, status} or None
+        """
+        query = """
+            SELECT version AS max_version, status
+            FROM i2i_recommend
+            WHERE gh_id = %s
+            ORDER BY version DESC
+            LIMIT 1;
+        """
+        rows = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return rows[0] if rows else None
+
+    async def activate_new_version(self, gh_id: str, version: str):
+        """将该账号最新version的数据 status 0 -> 1"""
+        query = """
+            UPDATE i2i_recommend
+            SET status = %s
+            WHERE gh_id = %s AND version = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.VersionStatus.ONLINE,
+                gh_id,
+                version,
+                self.VersionStatus.INIT,
+            ),
+        )
+
+    async def deactivate_old_versions(self, gh_id: str, latest_version: str):
+        """将该账号旧version的数据 status 1 -> 2(归档)"""
+        query = """
+            UPDATE i2i_recommend
+            SET status = %s
+            WHERE gh_id = %s AND version != %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.VersionStatus.ARCHIVED,
+                gh_id,
+                latest_version,
+                self.VersionStatus.ONLINE,
+            ),
+        )
+
+
+__all__ = ["I2IRecommendDataSyncMapper"]

+ 27 - 0
app/domains/recommend/i2i_recommend/_utils.py

@@ -0,0 +1,27 @@
+from typing import Dict
+
+from app.infra.external import feishu_robot
+
+from ._const import I2IRecommendDataSyncConst
+
+
+class I2IRecommendDataSyncUtil(I2IRecommendDataSyncConst):
+
+    @staticmethod
+    def filter_accounts_to_sync(account_version_list: list[dict]) -> list[dict]:
+        """
+        筛选需要切换的账号:最大version的status为OFFLINE(0)
+        """
+        return [
+            row for row in account_version_list
+            if row["status"] == I2IRecommendDataSyncConst.VersionStatus.INIT
+        ]
+
+    # 飞书通知
+    async def bot(self, title: str, detail: Dict, mention: bool=False):
+        return await feishu_robot.bot(
+            title=title,
+            detail=detail,
+            env=self.RANK_BOT,
+            mention=mention,
+        )

+ 116 - 0
app/domains/recommend/i2i_recommend/entrance.py

@@ -0,0 +1,116 @@
+import traceback
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import I2IRecommendDataSyncConst
+from ._mapper import I2IRecommendDataSyncMapper
+from ._utils import I2IRecommendDataSyncUtil
+
+
+class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.mapper = I2IRecommendDataSyncMapper(pool)
+        self.tool = I2IRecommendDataSyncUtil()
+        self.log_service = log_service
+
+    async def _sync_single_account(self, gh_id: str, version: str):
+        """单个账号的版本切换:先激活新版本,再关闭旧版本"""
+        await self.mapper.activate_new_version(gh_id=gh_id, version=version)
+        await self.mapper.deactivate_old_versions(gh_id=gh_id, latest_version=version)
+
+    async def deal(self):
+        """任务入口"""
+        failed_accounts = []
+
+        try:
+            # 1. 查询有待激活数据的账号
+            accounts = await self.mapper.fetch_all_accounts()
+            if not accounts:
+                # 上游数据未更新,无待激活账号
+                await self.tool.bot(
+                    title="i2i推荐数据同步-上游数据未更新",
+                    detail={"message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"},
+                    mention=True,
+                )
+                return
+
+            # 2. 逐账号判断并切换
+            sync_count = 0
+            for account in accounts:
+                gh_id = account["gh_id"]
+                try:
+                    version_info = await self.mapper.fetch_max_version_for_account(gh_id)
+                    if not version_info:
+                        continue
+
+                    # 最大version的status=0才需要切换
+                    if version_info["status"] != self.VersionStatus.INIT:
+                        continue
+
+                    sync_count += 1
+                    await self._sync_single_account(
+                        gh_id=gh_id, version=version_info["max_version"]
+                    )
+                except Exception as e:
+                    failed_accounts.append(gh_id)
+                    await self.log_service.log(
+                        contents={
+                            "task": "i2i_recommend_data_sync",
+                            "status": "fail",
+                            "gh_id": gh_id,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        }
+                    )
+
+            # 3. 汇总结果
+            await self.log_service.log(
+                contents={
+                    "task": "i2i_recommend_data_sync",
+                    "status": "success",
+                    "total": sync_count,
+                    "failed": len(failed_accounts),
+                    "failed_accounts": failed_accounts,
+                }
+            )
+
+            if failed_accounts:
+                await self.tool.bot(
+                    title="i2i推荐数据同步-部分账号失败",
+                    detail={
+                        "total": sync_count,
+                        "failed": len(failed_accounts),
+                        "failed_accounts": failed_accounts[:20],
+                    },
+                    mention=True,
+                )
+            else:
+                await self.tool.bot(
+                    title="i2i推荐数据同步成功",
+                    detail={"total": sync_count},
+                    mention=False,
+                )
+
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "i2i_recommend_data_sync",
+                    "status": "fail",
+                    "message": "deal 入口异常",
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                }
+            )
+            await self.tool.bot(
+                title="i2i推荐数据同步-入口异常",
+                detail={
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                },
+                mention=True,
+            )
+            raise
+
+
+__all__ = ["I2IRecommendDataSyncTask"]

+ 5 - 0
app/infra/external/feishu.py

@@ -22,6 +22,9 @@ class Feishu:
     # cookie 监测机器人
     cookie_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/51b9c83a-f50d-44dd-939f-bcd10ac6c55a"
 
+    # rank_bot
+    rank_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/f9dae7ba-decf-436b-b438-41994c35af1e"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -213,6 +216,8 @@ class FeishuBotApi(Feishu):
                 url = self.long_articles_task_bot
             case "cookie_monitor":
                 url = self.cookie_monitor_bot
+            case "rank_bot":
+                url = self.rank_monitor_bot
             case _:
                 url = self.long_articles_bot_dev
 

+ 3 - 1
app/infra/internal/aigc_system.py

@@ -32,16 +32,18 @@ class RelationDict(TypedDict):
     platform: str
 
 
-async def delete_illegal_gzh_articles(gh_id: str, title: str):
+async def delete_illegal_gzh_articles(gh_id: str, title: str, delete_flag: int = 0):
     """
     Delete illegal gzh articles
     :param gh_id: gzh id
     :param title: article title
+    :param delete_flag: 0: 违规检查删除; 1:html手动删除; 2: 文章限流删除
     """
     url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
     payload = {
         "title": title,
         "ghId": gh_id,
+        "deleteFlag": delete_flag,
     }
     headers = {"Content-Type": "application/json;charset=UTF-8"}
     async with AsyncHttpClient(timeout=600) as client: