Quellcode durchsuchen

Merge branch 'master' into feature/luojunhui/20260408-save-daily-rank-log

# Conflicts:
#	.gitignore
#	app/jobs/task_handler.py
luojunhui vor 3 Wochen
Ursprung
Commit
7e13ff0af6

+ 2 - 3
.gitignore

@@ -62,6 +62,5 @@ docs/_build/
 # PyBuilder
 target/
 
-.cursor
-
-.claude
+.claude
+.cursor

+ 1 - 1
app/domains/analysis_task/rate_limited_article_filter.py

@@ -84,7 +84,7 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
                 article_tuple=(title_md5, title, remark)
             )
             if insert_rows:
-                await delete_illegal_gzh_articles(gh_id=gh_id, title=title)
+                await delete_illegal_gzh_articles(gh_id=gh_id, title=title, delete_flag=2)
             else:
                 print("该文章已经删过")
 

+ 4 - 0
app/domains/recommend/i2i_recommend/__init__.py

@@ -0,0 +1,4 @@
+from .entrance import I2IRecommendDataSyncTask
+
+
+__all__ = ["I2IRecommendDataSyncTask"]

+ 14 - 0
app/domains/recommend/i2i_recommend/_const.py

@@ -0,0 +1,14 @@
+class I2IRecommendDataSyncConst:
+
+    RANK_BOT = "rank_bot"
+
+    class TaskStatus:
+        INIT = 0
+        RUNNING = 1
+        SUCCESS = 2
+        FAILED = 99
+
+    class VersionStatus:
+        INIT = 0
+        ONLINE = 1
+        ARCHIVED = 2

+ 70 - 0
app/domains/recommend/i2i_recommend/_mapper.py

@@ -0,0 +1,70 @@
+from app.core.database import DatabaseManager
+
+from ._const import I2IRecommendDataSyncConst
+
+
+class I2IRecommendDataSyncMapper(I2IRecommendDataSyncConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def fetch_all_accounts(self):
+        """查询有待激活数据的账号"""
+        query = """
+            SELECT gh_id FROM i2i_recommend WHERE status = %s GROUP BY gh_id;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(self.VersionStatus.INIT,),
+        )
+
+    async def fetch_max_version_for_account(self, gh_id: str):
+        """
+        查询单个账号的最大version及其status
+        返回: {max_version, status} or None
+        """
+        query = """
+            SELECT version AS max_version, status
+            FROM i2i_recommend
+            WHERE gh_id = %s
+            ORDER BY version DESC
+            LIMIT 1;
+        """
+        rows = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return rows[0] if rows else None
+
+    async def activate_new_version(self, gh_id: str, version: str):
+        """将该账号最新version的数据 status 0 -> 1"""
+        query = """
+            UPDATE i2i_recommend
+            SET status = %s
+            WHERE gh_id = %s AND version = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.VersionStatus.ONLINE,
+                gh_id,
+                version,
+                self.VersionStatus.INIT,
+            ),
+        )
+
+    async def deactivate_old_versions(self, gh_id: str, latest_version: str):
+        """将该账号旧version的数据 status 1 -> 2(归档)"""
+        query = """
+            UPDATE i2i_recommend
+            SET status = %s
+            WHERE gh_id = %s AND version != %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.VersionStatus.ARCHIVED,
+                gh_id,
+                latest_version,
+                self.VersionStatus.ONLINE,
+            ),
+        )
+
+
+__all__ = ["I2IRecommendDataSyncMapper"]

+ 27 - 0
app/domains/recommend/i2i_recommend/_utils.py

@@ -0,0 +1,27 @@
+from typing import Dict
+
+from app.infra.external import feishu_robot
+
+from ._const import I2IRecommendDataSyncConst
+
+
+class I2IRecommendDataSyncUtil(I2IRecommendDataSyncConst):
+
+    @staticmethod
+    def filter_accounts_to_sync(account_version_list: list[dict]) -> list[dict]:
+        """
+        筛选需要切换的账号:最大version的status为OFFLINE(0)
+        """
+        return [
+            row for row in account_version_list
+            if row["status"] == I2IRecommendDataSyncConst.VersionStatus.INIT
+        ]
+
+    # 飞书通知
+    async def bot(self, title: str, detail: Dict, mention: bool=False):
+        return await feishu_robot.bot(
+            title=title,
+            detail=detail,
+            env=self.RANK_BOT,
+            mention=mention,
+        )

+ 116 - 0
app/domains/recommend/i2i_recommend/entrance.py

@@ -0,0 +1,116 @@
+import traceback
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import I2IRecommendDataSyncConst
+from ._mapper import I2IRecommendDataSyncMapper
+from ._utils import I2IRecommendDataSyncUtil
+
+
+class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.mapper = I2IRecommendDataSyncMapper(pool)
+        self.tool = I2IRecommendDataSyncUtil()
+        self.log_service = log_service
+
+    async def _sync_single_account(self, gh_id: str, version: str):
+        """单个账号的版本切换:先激活新版本,再关闭旧版本"""
+        await self.mapper.activate_new_version(gh_id=gh_id, version=version)
+        await self.mapper.deactivate_old_versions(gh_id=gh_id, latest_version=version)
+
+    async def deal(self):
+        """任务入口"""
+        failed_accounts = []
+
+        try:
+            # 1. 查询有待激活数据的账号
+            accounts = await self.mapper.fetch_all_accounts()
+            if not accounts:
+                # 上游数据未更新,无待激活账号
+                await self.tool.bot(
+                    title="i2i推荐数据同步-上游数据未更新",
+                    detail={"message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"},
+                    mention=True,
+                )
+                return
+
+            # 2. 逐账号判断并切换
+            sync_count = 0
+            for account in accounts:
+                gh_id = account["gh_id"]
+                try:
+                    version_info = await self.mapper.fetch_max_version_for_account(gh_id)
+                    if not version_info:
+                        continue
+
+                    # 最大version的status=0才需要切换
+                    if version_info["status"] != self.VersionStatus.INIT:
+                        continue
+
+                    sync_count += 1
+                    await self._sync_single_account(
+                        gh_id=gh_id, version=version_info["max_version"]
+                    )
+                except Exception as e:
+                    failed_accounts.append(gh_id)
+                    await self.log_service.log(
+                        contents={
+                            "task": "i2i_recommend_data_sync",
+                            "status": "fail",
+                            "gh_id": gh_id,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        }
+                    )
+
+            # 3. 汇总结果
+            await self.log_service.log(
+                contents={
+                    "task": "i2i_recommend_data_sync",
+                    "status": "success",
+                    "total": sync_count,
+                    "failed": len(failed_accounts),
+                    "failed_accounts": failed_accounts,
+                }
+            )
+
+            if failed_accounts:
+                await self.tool.bot(
+                    title="i2i推荐数据同步-部分账号失败",
+                    detail={
+                        "total": sync_count,
+                        "failed": len(failed_accounts),
+                        "failed_accounts": failed_accounts[:20],
+                    },
+                    mention=True,
+                )
+            else:
+                await self.tool.bot(
+                    title="i2i推荐数据同步成功",
+                    detail={"total": sync_count},
+                    mention=False,
+                )
+
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "i2i_recommend_data_sync",
+                    "status": "fail",
+                    "message": "deal 入口异常",
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                }
+            )
+            await self.tool.bot(
+                title="i2i推荐数据同步-入口异常",
+                detail={
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                },
+                mention=True,
+            )
+            raise
+
+
+__all__ = ["I2IRecommendDataSyncTask"]

+ 5 - 0
app/infra/external/feishu.py

@@ -22,6 +22,9 @@ class Feishu:
     # cookie 监测机器人
     cookie_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/51b9c83a-f50d-44dd-939f-bcd10ac6c55a"
 
+    # rank_bot
+    rank_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/f9dae7ba-decf-436b-b438-41994c35af1e"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -213,6 +216,8 @@ class FeishuBotApi(Feishu):
                 url = self.long_articles_task_bot
             case "cookie_monitor":
                 url = self.cookie_monitor_bot
+            case "rank_bot":
+                url = self.rank_monitor_bot
             case _:
                 url = self.long_articles_bot_dev
 

+ 3 - 1
app/infra/internal/aigc_system.py

@@ -32,16 +32,18 @@ class RelationDict(TypedDict):
     platform: str
 
 
-async def delete_illegal_gzh_articles(gh_id: str, title: str):
+async def delete_illegal_gzh_articles(gh_id: str, title: str, delete_flag: int = 0):
     """
     Delete illegal gzh articles
     :param gh_id: gzh id
     :param title: article title
+    :param delete_flag: 0: 违规检查删除; 1:html手动删除; 2: 文章限流删除
     """
     url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
     payload = {
         "title": title,
         "ghId": gh_id,
+        "deleteFlag": delete_flag,
     }
     headers = {"Content-Type": "application/json;charset=UTF-8"}
     async with AsyncHttpClient(timeout=600) as client:

+ 8 - 0
app/jobs/domains/__init__.py

@@ -0,0 +1,8 @@
+from .algorithm import *
+from .anaylsis import *
+from .crawler_tasks import *
+from .data_recycle import *
+from .recommend import *
+from .monitor_task import *
+from .llm_task import *
+from .cold_start import *

+ 1 - 0
app/jobs/domains/algorithm.py

@@ -0,0 +1 @@
+from app.domains.algorithm_tasks import AccountCategoryAnalysis

+ 5 - 0
app/jobs/domains/anaylsis.py

@@ -0,0 +1,5 @@
+from app.domains.analysis_task import CrawlerDetailDeal
+from app.domains.analysis_task import AccountPositionReadRateAvg
+from app.domains.analysis_task import AccountPositionReadAvg
+from app.domains.analysis_task import AccountPositionOpenRateAvg
+from app.domains.analysis_task import RateLimitedArticleFilter

+ 2 - 0
app/jobs/domains/cold_start.py

@@ -0,0 +1,2 @@
+from app.domains.cold_start_tasks import ArticlePoolColdStart
+from app.domains.cold_start_tasks import AdPlatformArticlePublishTask

+ 4 - 0
app/jobs/domains/crawler_tasks.py

@@ -0,0 +1,4 @@
+from app.domains.crawler_tasks import CrawlerToutiao
+from app.domains.crawler_tasks import WeixinAccountManager
+from app.domains.crawler_tasks import CrawlerGzhAccountArticles
+from app.domains.crawler_tasks import CrawlerGzhSearchArticles

+ 10 - 0
app/jobs/domains/data_recycle.py

@@ -0,0 +1,10 @@
+from app.domains.data_recycle_tasks import ArticleDetailStat
+from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask
+from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask
+from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
+from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask
+from app.domains.data_recycle_tasks import (
+    UpdateOutsideRootSourceIdAndUpdateTimeTask,
+)

+ 8 - 0
app/jobs/domains/llm_task.py

@@ -0,0 +1,8 @@
+from app.domains.llm_tasks.aigc_decode_task import CreateAdPlatformArticlesDecodeTask
+from app.domains.llm_tasks.aigc_decode_task import CreateInnerArticlesDecodeTask
+from app.domains.llm_tasks.aigc_decode_task import FetchDecodeResults
+from app.domains.llm_tasks.aigc_decode_task import ExtractDecodeTaskDetail
+from app.domains.llm_tasks import TitleRewrite
+from app.domains.llm_tasks import ArticlePoolCategoryGeneration
+from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
+from app.domains.llm_tasks import ExtractTitleFeatures

+ 11 - 0
app/jobs/domains/monitor_task.py

@@ -0,0 +1,11 @@
+from app.domains.monitor_tasks import AutoReplyCardsMonitor
+from app.domains.monitor_tasks import check_kimi_balance
+from app.domains.monitor_tasks import GetOffVideos
+from app.domains.monitor_tasks import CheckVideoAuditStatus
+from app.domains.monitor_tasks import CooperateAccountsMonitorTask
+from app.domains.monitor_tasks import InnerGzhArticlesMonitor
+from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
+from app.domains.monitor_tasks import OutsideGzhArticlesCollector
+from app.domains.monitor_tasks import TaskProcessingMonitor
+from app.domains.monitor_tasks import LimitedAccountAnalysisTask
+from app.domains.monitor_tasks import AdPlatformAccountsMonitorTask

+ 6 - 0
app/jobs/domains/recommend.py

@@ -0,0 +1,6 @@
+from app.domains.recommend.i2i_recommend import I2IRecommendDataSyncTask
+
+
+__all__ = [
+    "I2IRecommendDataSyncTask",
+]

+ 9 - 54
app/jobs/task_handler.py

@@ -3,55 +3,7 @@ from typing import Callable, Dict, Optional
 
 from app.core.config import GlobalConfigSettings
 
-from app.domains.analysis_task import CrawlerDetailDeal
-from app.domains.analysis_task import AccountPositionReadRateAvg
-from app.domains.analysis_task import AccountPositionReadAvg
-from app.domains.analysis_task import AccountPositionOpenRateAvg
-from app.domains.analysis_task import RateLimitedArticleFilter
-
-from app.domains.algorithm_tasks import AccountCategoryAnalysis
-
-from app.domains.cold_start_tasks import ArticlePoolColdStart
-from app.domains.cold_start_tasks import AdPlatformArticlePublishTask
-
-
-from app.domains.crawler_tasks import CrawlerToutiao
-from app.domains.crawler_tasks import WeixinAccountManager
-from app.domains.crawler_tasks import CrawlerGzhAccountArticles
-from app.domains.crawler_tasks import CrawlerGzhSearchArticles
-
-from app.domains.data_recycle_tasks import ArticleDetailStat
-from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask
-from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask
-from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask
-from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
-from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
-from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask
-from app.domains.data_recycle_tasks import (
-    UpdateOutsideRootSourceIdAndUpdateTimeTask,
-)
-
-from app.domains.llm_tasks.aigc_decode_task import CreateAdPlatformArticlesDecodeTask
-from app.domains.llm_tasks.aigc_decode_task import CreateInnerArticlesDecodeTask
-from app.domains.llm_tasks.aigc_decode_task import FetchDecodeResults
-from app.domains.llm_tasks.aigc_decode_task import ExtractDecodeTaskDetail
-from app.domains.llm_tasks import TitleRewrite
-from app.domains.llm_tasks import ArticlePoolCategoryGeneration
-from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
-from app.domains.llm_tasks import ExtractTitleFeatures
-
-from app.domains.monitor_tasks import AutoReplyCardsMonitor
-from app.domains.monitor_tasks import check_kimi_balance
-from app.domains.monitor_tasks import GetOffVideos
-from app.domains.monitor_tasks import CheckVideoAuditStatus
-from app.domains.monitor_tasks import CooperateAccountsMonitorTask
-from app.domains.monitor_tasks import InnerGzhArticlesMonitor
-from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
-from app.domains.monitor_tasks import OutsideGzhArticlesCollector
-from app.domains.monitor_tasks import TaskProcessingMonitor
-from app.domains.monitor_tasks import LimitedAccountAnalysisTask
-from app.domains.monitor_tasks import AdPlatformAccountsMonitorTask
-from app.domains.monitor_tasks import RankLogMonitor
+from app.jobs.domains import *
 
 from app.jobs.task_config import TaskStatus
 from app.jobs.task_utils import TaskValidationError
@@ -110,11 +62,6 @@ class TaskHandler:
         await self.log_client.log(contents=log_data)
 
     # ==================== 监控类任务 ====================
-    @register("rank_log_monitor")
-    async def _rank_log_monitor_handler(self) -> int:
-        """迁移排序日志"""
-        sub_task = RankLogMonitor(self.db_client, self.log_client)
-        return await sub_task.deal()
 
     @register("check_kimi_balance")
     async def _check_kimi_balance_handler(self) -> int:
@@ -508,5 +455,13 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+    # ====================== Recommend Tasks=====================
+    @register("i2i_recommend_data_sync")
+    async def _i2i_recommend_data_sync_handler(self) -> int:
+        task = I2IRecommendDataSyncTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal()
+        return TaskStatus.SUCCESS
 
 __all__ = ["TaskHandler"]

+ 1 - 1
app/jobs/task_mapper.py

@@ -52,7 +52,7 @@ class TaskMapper(Const):
             case "crawler_toutiao_articles":
                 expire_duration = self.CRAWLER_TOUTIAO_ARTICLES_TIMEOUT
 
-            case "article_pool_pool_cold_start":
+            case "article_pool_cold_start":
                 expire_duration = self.ARTICLE_POOL_COLD_START_TIMEOUT
 
             case _: