瀏覽代碼

Merge branch 'feature/luojunhui/20260529-cooperate-auto-reply-improve' of Server/LongArticleTaskServer into master

luojunhui 1 周之前
父節點
當前提交
ac13490d30

+ 2 - 1
.gitignore

@@ -64,4 +64,5 @@ target/
 
 .claude
 .cursor
-scripts
+scripts
+CLAUDE.md

+ 6 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_const.py

@@ -24,3 +24,9 @@ class AutoReplyCardsMonitorConst:
 
     # 封面下载失败率超过此阈值触发飞书报警
     FEISHU_ALERT_COVER_FAIL_RATE = 0.1
+
+    # 任务优先级分档(AIGC 侧 ORDER BY priority DESC 消费)
+    class TaskPriority:
+        HIGH = 3
+        MID = 2
+        LOW = 1

+ 3 - 3
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -53,14 +53,14 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
         )
 
     # 插入AIGC自动回复任务
-    async def insert_aigc_auto_reply_task(self, task_id, account_name):
+    async def insert_aigc_auto_reply_task(self, task_id, account_name, priority=1):
         timestamp = int(time.time() * 1000)
         query = """
-            INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); 
+            INSERT INTO gzh_msg_record (task_id, task_params, priority, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
         """
         return await self.pool.async_save(
             query=query,
-            params=(task_id, account_name, timestamp, timestamp),
+            params=(task_id, account_name, priority, timestamp, timestamp),
             db_name="aigc",
         )
 

+ 26 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -19,6 +19,8 @@ from app.infra.shared import OssUtils
 from app.infra.shared.tools import fetch_from_odps
 from app.infra.crawler.wechat import get_article_detail
 
+from ._const import AutoReplyCardsMonitorConst
+
 from app.schemas import ImagePath
 
 
@@ -35,6 +37,30 @@ class AutoReplyCardsMonitorUtils:
             case _:
                 return f"{task_name}_{uuid.uuid4()}"
 
+    @staticmethod
+    def assign_priority_tiers(sorted_accounts, top_pct=0.3, mid_pct=0.4):
+        """按UV百分比分档分配优先级。
+
+        sorted_accounts: [(account_name, uv), ...] 已按 uv 降序排列
+        返回: [(account_name, uv, priority), ...]
+        """
+        n = len(sorted_accounts)
+        if n == 0:
+            return []
+
+        top_n = int(n * top_pct)
+        mid_n = int(n * (top_pct + mid_pct))
+
+        result = []
+        for i, (name, uv) in enumerate(sorted_accounts):
+            if i < top_n:
+                result.append((name, uv, AutoReplyCardsMonitorConst.TaskPriority.HIGH))
+            elif i < mid_n:
+                result.append((name, uv, AutoReplyCardsMonitorConst.TaskPriority.MID))
+            else:
+                result.append((name, uv, AutoReplyCardsMonitorConst.TaskPriority.LOW))
+        return result
+
     @staticmethod
     def parse_fields(root, fields, default=""):
         result = {}

+ 26 - 5
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -457,13 +457,13 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
 
     # 创建单个账号自动回复任务
-    async def create_auto_reply_single_account_task(self, gh_id, account_name):
+    async def create_auto_reply_single_account_task(self, gh_id, account_name, priority=0):
         task_id = self.tool.generate_task_id(task_name="auto_reply", gh_id=gh_id)
         # 先插入 task, 再创建自动回复任务
         create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
         if create_row:
             affected_rows = await self.mapper.insert_aigc_auto_reply_task(
-                task_id, gh_id
+                task_id, gh_id, priority
             )
             if not affected_rows:
                 await self.log_service.log(
@@ -515,8 +515,28 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             )
             return
 
-        for account in account_list:
-            account_name = getattr(account, "公众号名", None)
+        # 按 UV 降序排列 + 分档分配优先级
+        sorted_accounts = sorted(
+            (
+                (getattr(a, "公众号名", None), getattr(a, "uv", 0) or 0)
+                for a in account_list
+            ),
+            key=lambda x: x[1],
+            reverse=True,
+        )
+        tiered_accounts = self.tool.assign_priority_tiers(sorted_accounts)
+
+        top_n = min(10, len(tiered_accounts))
+        await self.log_service.log(
+            contents={
+                "task": "auto_reply_cards_monitor",
+                "function": "follow_gzh_task",
+                "status": "info",
+                "message": f"账号按UV分档完成, total={len(tiered_accounts)}, top{top_n}={[(name, uv, p) for name, uv, p in tiered_accounts[:top_n]]}",
+            }
+        )
+
+        for account_name, uv, priority in tiered_accounts[:1]:
             if not account_name:
                 await self.log_service.log(
                     contents={
@@ -585,7 +605,8 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                     continue
 
-                await self.create_auto_reply_single_account_task(gh_id, account_name)
+                print(gh_id, account_name, uv, priority)
+                await self.create_auto_reply_single_account_task(gh_id, account_name, priority)
 
             except Exception as e:
                 await self.log_service.log(