ソースを参照

decode_article
decode_material

优化

luojunhui 6 時間 前
コミット
db5d5cf8c1

+ 2 - 2
app/domains/llm_tasks/decode_article/_const.py

@@ -1,7 +1,7 @@
 class DecodeArticleConst:
     CONFIG_ID = 66  # 长文头条-文章解构-正式
-    TASK_BATCH = 500  # 每批处理数
-    # TASK_BATCH = 20  # 每批处理数
+
+    TASK_BATCH = 500  # 每批处理上限(队列消费能力约束)
     SUBMIT_BATCH = 50  # 提交 API 每批帖子上限
 
     class TaskStatus:

+ 32 - 4
app/domains/llm_tasks/decode_article/_mapper.py

@@ -93,6 +93,34 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
             ),
         )
 
+    async def count_pending_tasks(self, source: int = None) -> int:
+        """统计队列中正在进行(INIT + PROCESSING)的任务数量,用于控制消费端压力"""
+        if source is not None:
+            query = f"""
+                SELECT COUNT(*) AS cnt
+                FROM {TABLE}
+                WHERE status IN (%s, %s) AND source = %s AND config_id = %s
+            """
+            params = (
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                source,
+                self.CONFIG_ID,
+            )
+        else:
+            query = f"""
+                SELECT COUNT(*) AS cnt
+                FROM {TABLE}
+                WHERE status IN (%s, %s) AND config_id = %s
+            """
+            params = (
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                self.CONFIG_ID,
+            )
+        rows = await self.pool.async_fetch(query=query, params=params)
+        return rows[0]["cnt"] if rows else 0
+
     async def fetch_pending_tasks(self, source: int = None) -> List[Dict]:
         if source is not None:
             query = f"""
@@ -202,7 +230,7 @@ class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             query=query, params=(new_status, id_, ori_status)
         )
 
-    async def fetch_decode_articles(self) -> List[Dict]:
+    async def fetch_decode_articles(self, limit: int = None) -> List[Dict]:
         query = """
             SELECT id, account_name, gh_id, article_title, article_cover,
                    article_text, article_images, wx_sn
@@ -212,7 +240,7 @@ class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
         """
         return await self.pool.async_fetch(
             query=query,
-            params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
+            params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
         )
 
 
@@ -222,7 +250,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
     def __init__(self, pool: DatabaseManager):
         super().__init__(pool)
 
-    async def fetch_inner_articles(self) -> List[Dict]:
+    async def fetch_inner_articles(self, limit: int = None) -> List[Dict]:
         query = f"""
             SELECT id, title, source_id, coverimgurl, article_text, summary, card_title
             FROM {self.TABLE_INNER}
@@ -230,7 +258,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             LIMIT %s
         """
         return await self.pool.async_fetch(
-            query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
+            query=query, params=(self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
         )
 
     async def update_inner_article_status(

+ 22 - 2
app/domains/llm_tasks/decode_article/create_decode_tasks.py

@@ -236,8 +236,28 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
         )
 
     async def _acquire_articles(self) -> List[Dict]:
-        """获取待解构文章,并加锁(status INIT → PROCESSING)"""
-        article_list = await self.mapper.fetch_inner_articles()
+        """获取待解构文章,并加锁(status INIT → PROCESSING)
+
+        先检查队列中已有多少进行中任务(INIT + PROCESSING),控制消费端压力:
+        - 软上限:可创建数 ≈ TASK_BATCH - 已有进行中任务数
+        - 计数与取数之间不原子,其他 worker 可能同时插入,由后续 CAS 锁兜底
+        - 若队列已满(pending >= TASK_BATCH),跳过本轮
+        """
+        pending_count = await self.mapper.count_pending_tasks(
+            source=self.SourceType.INNER
+        )
+        available_slots = max(0, self.TASK_BATCH - pending_count)
+
+        if available_slots == 0:
+            await self.log_service.log(
+                contents={
+                    "task": "create_inner_decode_task",
+                    "message": f"队列已满:进行中任务 {pending_count} >= {self.TASK_BATCH},跳过本轮创建",
+                }
+            )
+            return []
+
+        article_list = await self.mapper.fetch_inner_articles(limit=available_slots)
         if self._TEST_MODE:
             return article_list
 

+ 21 - 2
app/domains/llm_tasks/decode_material/_mapper.py

@@ -16,7 +16,25 @@ class MaterialDecodeTaskMapper(DecodeMaterialConst):
 
     # ——— growth_daily_material ———
 
-    async def fetch_materials(self) -> List[Dict]:
+    async def count_pending_tasks(self) -> int:
+        """统计队列中正在进行(INIT + PROCESSING)的素材任务数,用于控制消费端压力"""
+        query = f"""
+            SELECT COUNT(*) AS cnt
+            FROM {TABLE_TASK}
+            WHERE status IN (%s, %s) AND source = %s AND config_id = %s
+        """
+        rows = await self.pool.async_fetch(
+            query=query,
+            params=(
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                self.SourceType.MATERIAL,
+                self.CONFIG_ID,
+            ),
+        )
+        return rows[0]["cnt"] if rows else 0
+
+    async def fetch_materials(self, limit: int = None) -> List[Dict]:
         """获取待解构素材:status=INIT"""
         query = f"""
             SELECT id, material_id, material_title, material_cover
@@ -25,7 +43,8 @@ class MaterialDecodeTaskMapper(DecodeMaterialConst):
             LIMIT %s
         """
         return await self.pool.async_fetch(
-            query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
+            query=query,
+            params=(self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
         )
 
     async def update_material_status(

+ 20 - 2
app/domains/llm_tasks/decode_material/create_decode_tasks.py

@@ -17,8 +17,26 @@ class CreateMaterialsDecodeTask(DecodeMaterialConst):
         self.tool = MaterialDecodeUtils()
 
     async def _acquire_materials(self) -> List[Dict]:
-        """获取待解构素材并加锁(status INIT → PROCESSING)"""
-        materials = await self.mapper.fetch_materials()
+        """获取待解构素材并加锁(status INIT → PROCESSING)
+
+        先检查队列中已有多少进行中任务(INIT + PROCESSING),控制消费端压力:
+        - 软上限:可创建数 ≈ TASK_BATCH - 已有进行中任务数
+        - 计数与取数之间不原子,其他 worker 可能同时插入,由后续 CAS 锁兜底
+        - 若队列已满(pending >= TASK_BATCH),跳过本轮
+        """
+        pending_count = await self.mapper.count_pending_tasks()
+        available_slots = max(0, self.TASK_BATCH - pending_count)
+
+        if available_slots == 0:
+            await self.log_service.log(
+                contents={
+                    "task": "create_material_decode_task",
+                    "message": f"队列已满:进行中任务 {pending_count} >= {self.TASK_BATCH},跳过本轮创建",
+                }
+            )
+            return []
+
+        materials = await self.mapper.fetch_materials(limit=available_slots)
         locked = []
         for m in materials:
             mid = m["id"]