luojunhui 1 неделя назад
Родитель
Сommit
96f557aa6c

+ 8 - 2
app/domains/llm_tasks/decode_cards/_const.py

@@ -2,12 +2,17 @@ class DecodeCardConst:
     TASK_BATCH = 500
     SUBMIT_BATCH = 50
 
-    # auto_reply_top_cards_daily.channel → AIGC config_id
+    # auto_reply_top_cards_daily.channel → (AIGC config_id, source)
     CHANNEL_CONFIG_MAP = {
         "公众号合作-即转-稳定": 67,
         "公众号投流-稳定": 68,
     }
 
+    CHANNEL_SOURCE_MAP = {
+        "公众号合作-即转-稳定": 4,
+        "公众号投流-稳定": 3,
+    }
+
     class TaskStatus:
         INIT = 0
         PROCESSING = 1
@@ -26,7 +31,8 @@ class DecodeCardConst:
         FAILED = "FAILED"
 
     class SourceType:
-        PARTNER_CARD = 4  # 合作方即转卡片
+        PARTNER_CARD_TOULIU = 3    # 公众号投流-即转卡片
+        PARTNER_CARD_COOPERATE = 4  # 公众号合作-即转卡片
 
     class TaskChannel:
         PARTNER_CARD = 4  # long_articles_decode_tasks.channel: 合作方即转卡片

+ 8 - 6
app/domains/llm_tasks/decode_cards/_mapper.py

@@ -56,6 +56,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
         self,
         source_id: str,
         config_id: int,
+        source: int,
         payload: str,
         remark: str = None,
         status: int = None,
@@ -67,7 +68,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
                 VALUES (%s, %s, %s, %s, %s, %s, %s)
             """
             params = (
-                source_id, config_id, self.SourceType.PARTNER_CARD,
+                source_id, config_id, source,
                 self.TaskChannel.PARTNER_CARD, payload, remark, status,
             )
         else:
@@ -77,7 +78,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
                 VALUES (%s, %s, %s, %s, %s, %s)
             """
             params = (
-                source_id, config_id, self.SourceType.PARTNER_CARD,
+                source_id, config_id, source,
                 self.TaskChannel.PARTNER_CARD, payload, remark,
             )
         return await self.pool.async_save(query=query, params=params)
@@ -111,7 +112,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
         query = f"""
             SELECT source_id, config_id
             FROM {TABLE_TASK}
-            WHERE status IN (%s, %s) AND source = %s
+            WHERE status IN (%s, %s) AND source IN (%s, %s)
             ORDER BY config_id
             LIMIT %s
         """
@@ -120,7 +121,8 @@ class CardDecodeTaskMapper(DecodeCardConst):
             params=(
                 self.TaskStatus.INIT,
                 self.TaskStatus.PROCESSING,
-                self.SourceType.PARTNER_CARD,
+                self.SourceType.PARTNER_CARD_TOULIU,
+                self.SourceType.PARTNER_CARD_COOPERATE,
                 self.TASK_BATCH,
             ),
         )
@@ -147,7 +149,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
         )
 
     async def fetch_existing_source_ids(
-        self, source_ids: List[str], config_id: int
+        self, source_ids: List[str], config_id: int, source: int
     ) -> set:
         """批量查询已有任务记录的 source_id,用于去重跳过"""
         if not source_ids:
@@ -167,7 +169,7 @@ class CardDecodeTaskMapper(DecodeCardConst):
                 params=(
                     *source_ids,
                     config_id,
-                    self.SourceType.PARTNER_CARD,
+                    source,
                     self.TaskStatus.INIT,
                     self.TaskStatus.PROCESSING,
                     self.TaskStatus.SUCCESS,

+ 8 - 3
app/domains/llm_tasks/decode_cards/create_decode_tasks.py

@@ -86,7 +86,7 @@ class CreateCardsDecodeTask(DecodeCardConst):
         return dict(grouped)
 
     async def _submit_and_record_for_config(
-        self, cards: List[Dict], config_id: int
+        self, cards: List[Dict], config_id: int, source: int
     ):
         """对同一 config_id 的卡片执行提交与落库"""
         if not cards:
@@ -100,7 +100,7 @@ class CreateCardsDecodeTask(DecodeCardConst):
         # 跨批次去重:查 DB 已有任务
         all_source_ids = [str(c["card_cover_id"]) for c in cards_with_cid]
         existing = await self.mapper.fetch_existing_source_ids(
-            all_source_ids, config_id
+            all_source_ids, config_id, source
         )
 
         # 同批次去重:相同 card_cover_id 只保留第一条
@@ -193,6 +193,7 @@ class CreateCardsDecodeTask(DecodeCardConst):
                     await self.mapper.insert_decode_task(
                         source_id=source_id,
                         config_id=config_id,
+                        source=source,
                         payload=json.dumps(
                             posts_by_cid.get(source_id, {}), ensure_ascii=False
                         ),
@@ -224,6 +225,7 @@ class CreateCardsDecodeTask(DecodeCardConst):
                     await self.mapper.insert_decode_task(
                         source_id=source_id,
                         config_id=config_id,
+                        source=source,
                         payload=json.dumps(
                             posts_by_cid.get(source_id, {}), ensure_ascii=False
                         ),
@@ -247,6 +249,7 @@ class CreateCardsDecodeTask(DecodeCardConst):
                 await self.mapper.insert_decode_task(
                     source_id=source_id,
                     config_id=config_id,
+                    source=source,
                     payload=json.dumps(
                         posts_by_cid.get(source_id, {}), ensure_ascii=False
                     ),
@@ -311,7 +314,9 @@ class CreateCardsDecodeTask(DecodeCardConst):
 
         grouped = self._group_cards_by_channel(valid_cards)
         for config_id, group_cards in grouped.items():
-            await self._submit_and_record_for_config(group_cards, config_id)
+            source_channel = group_cards[0]["channel"]
+            source = self.CHANNEL_SOURCE_MAP[source_channel]
+            await self._submit_and_record_for_config(group_cards, config_id, source)
 
         # 处理不在映射表中的卡片(回滚状态)
         mapped_ids = {id(c) for g in grouped.values() for c in g}

+ 1 - 1
app/domains/llm_tasks/decode_material/_const.py

@@ -1,6 +1,6 @@
 class DecodeMaterialConst:
     CONFIG_ID = 69
-    TASK_BATCH = 100
+    TASK_BATCH = 200
     SUBMIT_BATCH = 50
 
     class TaskStatus:

+ 13 - 0
app/jobs/task_config.py

@@ -134,6 +134,19 @@ TASK_CONFIGS = {
         max_concurrent=3,
         retry_times=2,
     ),
+    # 文章解构任务
+    "create_inner_articles_decode_task": TaskConfig(
+        timeout=3600,
+        max_concurrent=2,
+    ),
+    "fetch_decode_result": TaskConfig(
+        timeout=1800,
+        max_concurrent=3,
+    ),
+    "extract_decode_result": TaskConfig(
+        timeout=1800,
+        max_concurrent=3,
+    ),
     # 卡片解构任务
     "create_cards_decode_task": TaskConfig(
         timeout=3600,