Kaynağa Gözat

长文解构任务-补充数据

luojunhui 13 saat önce
ebeveyn
işleme
7d3273ef74

+ 1 - 1
app/domains/llm_tasks/decode_article/_const.py

@@ -1,6 +1,6 @@
 class DecodeArticleConst:
     CONFIG_ID = 66  # 长文头条-文章解构-正式
-    TASK_BATCH = 200  # 每批处理数
+    TASK_BATCH = 500  # 每批处理数
     # TASK_BATCH = 20  # 每批处理数
     SUBMIT_BATCH = 50  # 提交 API 每批帖子上限
 

+ 71 - 51
app/domains/llm_tasks/decode_article/_mapper.py

@@ -4,7 +4,7 @@ from app.core.database import DatabaseManager
 
 from ._const import DecodeArticleConst
 
-TABLE = "long_articles_decode_tasks_v2"
+TABLE = "long_articles_decode_tasks"
 
 
 class ArticlesDecodeTaskMapper(DecodeArticleConst):
@@ -13,32 +13,44 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
 
     async def insert_decode_task(
         self,
-        channel_content_id: str,
-        content_id: str,
+        source_id: str,
         source: int,
         payload: str,
         remark: str = None,
+        status: int = None,
     ) -> int:
-        query = f"""
-            INSERT IGNORE INTO {TABLE}
-                (channel_content_id, config_id, content_id, source, payload, remark)
-            VALUES (%s, %s, %s, %s, %s, %s)
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(
-                channel_content_id,
+        if status is not None:
+            query = f"""
+                INSERT IGNORE INTO {TABLE}
+                    (source_id, config_id, source, payload, remark, status)
+                VALUES (%s, %s, %s, %s, %s, %s)
+            """
+            params = (
+                source_id,
                 self.CONFIG_ID,
-                content_id,
                 source,
                 payload,
                 remark,
-            ),
-        )
+                status,
+            )
+        else:
+            query = f"""
+                INSERT IGNORE INTO {TABLE}
+                    (source_id, config_id, source, payload, remark)
+                VALUES (%s, %s, %s, %s, %s)
+            """
+            params = (
+                source_id,
+                self.CONFIG_ID,
+                source,
+                payload,
+                remark,
+            )
+        return await self.pool.async_save(query=query, params=params)
 
-    async def update_task_status_by_channel(
+    async def update_task_status_by_source_id(
         self,
-        channel_content_id: str,
+        source_id: str,
         ori_status: int,
         new_status: int,
         remark: str = None,
@@ -46,23 +58,23 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
         query = f"""
             UPDATE {TABLE}
             SET status = %s, remark = %s
-            WHERE channel_content_id = %s AND status = %s AND config_id = %s
+            WHERE source_id = %s AND status = %s AND config_id = %s
         """
         return await self.pool.async_save(
             query=query,
-            params=(new_status, remark, channel_content_id, ori_status, self.CONFIG_ID),
+            params=(new_status, remark, source_id, ori_status, self.CONFIG_ID),
         )
 
     async def set_decode_result(
         self,
-        channel_content_id: str,
+        source_id: str,
         result: str,
         remark: str = None,
     ) -> int:
         query = f"""
             UPDATE {TABLE}
             SET status = %s, result = %s, remark = %s
-            WHERE channel_content_id = %s AND status IN (%s, %s) AND config_id = %s
+            WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
         """
         return await self.pool.async_save(
             query=query,
@@ -70,52 +82,65 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
                 self.TaskStatus.SUCCESS,
                 result,
                 remark,
-                channel_content_id,
+                source_id,
                 self.TaskStatus.INIT,
                 self.TaskStatus.PROCESSING,
                 self.CONFIG_ID,
             ),
         )
 
-    async def fetch_pending_tasks(
-        self, source: int = None
-    ) -> List[Dict]:
+    async def fetch_pending_tasks(self, source: int = None) -> List[Dict]:
         if source is not None:
             query = f"""
-                SELECT channel_content_id, content_id
+                SELECT source_id
                 FROM {TABLE}
-                WHERE status = %s AND source = %s AND config_id = %s
+                WHERE status IN (%s, %s) AND source = %s AND config_id = %s
                 LIMIT %s
             """
-            params = (self.TaskStatus.INIT, source, self.CONFIG_ID, self.TASK_BATCH)
+            params = (
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                source,
+                self.CONFIG_ID,
+                self.TASK_BATCH,
+            )
         else:
             query = f"""
-                SELECT channel_content_id, content_id
+                SELECT source_id
                 FROM {TABLE}
-                WHERE status = %s AND config_id = %s
+                WHERE status IN (%s, %s) AND config_id = %s
                 LIMIT %s
             """
-            params = (self.TaskStatus.INIT, self.CONFIG_ID, self.TASK_BATCH)
+            params = (
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                self.CONFIG_ID,
+                self.TASK_BATCH,
+            )
         return await self.pool.async_fetch(query=query, params=params)
 
-    async def fetch_existing_channel_content_ids(
-        self, channel_content_ids: List[str]
-    ) -> set:
-        """批量查询哪些 channel_content_id 已有成功解构结果,用于去重跳过"""
-        if not channel_content_ids:
+    async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
+        """批量查询哪些 source_id 已有进行中或成功的解构任务,用于去重跳过"""
+        if not source_ids:
             return set()
-        placeholders = ",".join(["%s"] * len(channel_content_ids))
+        placeholders = ",".join(["%s"] * len(source_ids))
         query = f"""
-            SELECT channel_content_id FROM {TABLE}
-            WHERE channel_content_id IN ({placeholders})
+            SELECT source_id FROM {TABLE}
+            WHERE source_id IN ({placeholders})
               AND config_id = %s
-              AND status = %s
+              AND status IN (%s, %s, %s)
         """
         rows = await self.pool.async_fetch(
             query=query,
-            params=(*channel_content_ids, self.CONFIG_ID, self.TaskStatus.SUCCESS),
+            params=(
+                *source_ids,
+                self.CONFIG_ID,
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                self.TaskStatus.SUCCESS,
+            ),
         )
-        return {r["channel_content_id"] for r in rows}
+        return {r["source_id"] for r in rows}
 
     async def fetch_extract_tasks(self) -> List[Dict]:
         query = f"""
@@ -139,11 +164,9 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
             query=query, params=(new_status, ori_status, task_id)
         )
 
-    async def record_extract_detail(
-        self, decode_task_id: int, detail: Dict
-    ) -> int:
+    async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
         query = """
-            INSERT INTO long_articles_decode_task_detail_v2
+            INSERT INTO long_articles_decode_task_detail
                 (decode_task_id, inspiration, purpose, key_point, topic)
             VALUES (%s, %s, %s, %s, %s)
         """
@@ -197,10 +220,9 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
 
     async def fetch_inner_articles(self) -> List[Dict]:
         query = f"""
-            SELECT id, title, source_id, wx_sn, coverimgurl, article_text, summary, card_title
+            SELECT id, title, source_id, coverimgurl, article_text, summary, card_title
             FROM {self.TABLE_INNER}
             WHERE status = %s
-            ORDER BY max_read_rate DESC
             LIMIT %s
         """
         return await self.pool.async_fetch(
@@ -219,9 +241,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             query=query, params=(new_status, id_, ori_status)
         )
 
-    async def fetch_inner_articles_produce_detail(
-        self, source_id
-    ) -> List[Dict]:
+    async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
         query = """
             SELECT produce_module_type, output
             FROM produce_plan_module_output

+ 9 - 11
app/domains/llm_tasks/decode_article/_utils.py

@@ -12,7 +12,7 @@ class AigcDecodeUtils(DecodeArticleConst):
     async def submit_decode_batch(
         self, posts: List[Dict], *, config_id: int = None, skip_completed: bool = False
     ) -> Dict[str, Dict]:
-        """分批提交解构任务,返回 {channel_content_id: {status, errorMessage}}"""
+        """分批提交解构任务,返回 {content_id: {status, errorMessage}}"""
         cfg_id = config_id or self.CONFIG_ID
         result = {}
         for i in range(0, len(posts), self.SUBMIT_BATCH):
@@ -35,15 +35,15 @@ class AigcDecodeUtils(DecodeArticleConst):
         return result
 
     async def query_decode_results_batch(
-        self, channel_content_ids: List[str], *, config_id: int = None
+        self, content_ids: List[str], *, config_id: int = None
     ) -> Dict[str, Dict]:
-        """分批查询解构结果,返回 {channel_content_id: {status, dataContent, html, errorMessage}}
+        """分批查询解构结果,返回 {content_id: {status, dataContent, html, errorMessage}}
         当 API 调用失败时,对应条目 status 为 API_ERROR,调用方应保持 INIT 等待重试。
         """
         cfg_id = config_id or self.CONFIG_ID
         result = {}
-        for i in range(0, len(channel_content_ids), self.SUBMIT_BATCH):
-            batch = channel_content_ids[i : i + self.SUBMIT_BATCH]
+        for i in range(0, len(content_ids), self.SUBMIT_BATCH):
+            batch = content_ids[i : i + self.SUBMIT_BATCH]
             response = await self.decode_server.query_decode_results(
                 config_id=cfg_id, channel_content_ids=batch
             )
@@ -82,9 +82,7 @@ class AigcDecodeUtils(DecodeArticleConst):
         )
 
         def _join_points(items: list, key: str) -> str:
-            parts = [
-                str(p[key]) for p in items if isinstance(p, dict) and p.get(key)
-            ]
+            parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
             return ",".join(parts)
 
         return {
@@ -136,8 +134,8 @@ class InnerArticlesDecodeUtils(AigcDecodeUtils):
     ) -> List[Dict]:
         posts = []
         for article in articles:
-            wx_sn = article["wx_sn"]
-            produce_info = produce_info_map.get(wx_sn, [])
+            source_id = str(article["source_id"])
+            produce_info = produce_info_map.get(source_id, [])
 
             # 收集图片:封面(coverimgurl) + produce COVER + produce IMAGE
             images = []
@@ -158,7 +156,7 @@ class InnerArticlesDecodeUtils(AigcDecodeUtils):
                     "video": None,
                     "contentModal": self.ContentModal.LONG_ARTICLE,
                     "channel": self.Channel.WECHAT,
-                    "channelContentId": wx_sn,
+                    "channelContentId": source_id,
                 }
             )
         return posts

+ 37 - 40
app/domains/llm_tasks/decode_article/create_decode_tasks.py

@@ -66,7 +66,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                         "wx_sn": wx_sn,
                         "task": "create_decode_task_v2",
                         "status": "fail",
-                        "message": "no response for channel_content_id, rolled back to INIT",
+                        "message": "no response for content_id, rolled back to INIT",
                     }
                 )
                 continue
@@ -91,12 +91,14 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                 # 已有解构结果,直接查询结果并落库
                 query_results = await self.tool.query_decode_results_batch([wx_sn])
                 result_data = query_results.get(wx_sn)
-                if result_data and result_data.get("status") == self.QueryStatus.SUCCESS:
+                if (
+                    result_data
+                    and result_data.get("status") == self.QueryStatus.SUCCESS
+                ):
                     data_content = result_data.get("dataContent") or "{}"
                     html = result_data.get("html")
                     await self.mapper.insert_decode_task(
-                        channel_content_id=wx_sn,
-                        content_id=article_id,
+                        source_id=wx_sn,
                         source=self.SourceType.AD_PLATFORM,
                         payload=json.dumps(
                             posts_by_wx.get(wx_sn, {}), ensure_ascii=False
@@ -104,7 +106,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                         remark="提交时已有解构结果,直接落库",
                     )
                     await self.mapper.set_decode_result(
-                        channel_content_id=wx_sn,
+                        source_id=wx_sn,
                         result=json.dumps(
                             {"dataContent": data_content, "html": html},
                             ensure_ascii=False,
@@ -126,13 +128,13 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                 else:
                     # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
                     await self.mapper.insert_decode_task(
-                        channel_content_id=wx_sn,
-                        content_id=article_id,
+                        source_id=wx_sn,
                         source=self.SourceType.AD_PLATFORM,
                         payload=json.dumps(
                             posts_by_wx.get(wx_sn, {}), ensure_ascii=False
                         ),
                         remark="提交返回SUCCESS,查询未果,等待轮询",
+                        status=self.TaskStatus.PROCESSING,
                     )
                     await self.mapper.update_article_decode_status(
                         article_id,
@@ -150,13 +152,11 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                     )
             elif status == self.SubmitStatus.PENDING:
                 await self.mapper.insert_decode_task(
-                    channel_content_id=wx_sn,
-                    content_id=article_id,
+                    source_id=wx_sn,
                     source=self.SourceType.AD_PLATFORM,
-                    payload=json.dumps(
-                        posts_by_wx.get(wx_sn, {}), ensure_ascii=False
-                    ),
+                    payload=json.dumps(posts_by_wx.get(wx_sn, {}), ensure_ascii=False),
                     remark="任务已提交,等待轮询",
+                    status=self.TaskStatus.PROCESSING,
                 )
                 await self.mapper.update_article_decode_status(
                     article_id,
@@ -244,20 +244,18 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
     async def _handle_result(
         self,
         article: Dict,
-        channel_content_id: str,
+        source_id: str,
         result: Dict,
         posts_by_cid: Dict,
         config_id: int,
     ):
-        wx_sn = article["wx_sn"]
-
         if not result:
             await self.log_service.log(
                 contents={
-                    "wx_sn": wx_sn,
+                    "source_id": source_id,
                     "task": "create_inner_decode_task",
                     "status": "fail",
-                    "message": "no response for channel_content_id",
+                    "message": "no response for source_id",
                 }
             )
             return
@@ -266,7 +264,7 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
         if status == self.SubmitStatus.FAILED:
             await self.log_service.log(
                 contents={
-                    "wx_sn": wx_sn,
+                    "source_id": source_id,
                     "task": "create_inner_decode_task",
                     "status": "fail",
                     "data": result,
@@ -274,48 +272,45 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
             )
         elif status == self.SubmitStatus.PENDING:
             await self.mapper.insert_decode_task(
-                channel_content_id=channel_content_id,
-                content_id=str(article.get("source_id", "")),
+                source_id=source_id,
                 source=self.SourceType.INNER,
-                payload=json.dumps(
-                    posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
-                ),
+                payload=json.dumps(posts_by_cid.get(source_id, {}), ensure_ascii=False),
                 remark="内部文章解构任务已提交",
+                status=self.TaskStatus.PROCESSING,
             )
         elif status == self.SubmitStatus.SUCCESS:
             query_results = await self.tool.query_decode_results_batch(
-                [channel_content_id], config_id=config_id
+                [source_id], config_id=config_id
             )
-            result_data = query_results.get(channel_content_id)
+            result_data = query_results.get(source_id)
             data_content = result_data.get("dataContent") if result_data else None
             if data_content:
                 await self.mapper.insert_decode_task(
-                    channel_content_id=channel_content_id,
-                    content_id=str(article.get("source_id", "")),
+                    source_id=source_id,
                     source=self.SourceType.INNER,
                     payload=json.dumps(
-                        posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
+                        posts_by_cid.get(source_id, {}), ensure_ascii=False
                     ),
                     remark="内部文章解构结果已获取",
                 )
                 await self.mapper.set_decode_result(
-                    channel_content_id=channel_content_id,
+                    source_id=source_id,
                     result=json.dumps(
                         {"dataContent": data_content}, ensure_ascii=False
                     ),
                 )
             else:
                 await self.mapper.insert_decode_task(
-                    channel_content_id=channel_content_id,
-                    content_id=str(article.get("source_id", "")),
+                    source_id=source_id,
                     source=self.SourceType.INNER,
                     payload=json.dumps(result, ensure_ascii=False),
                     remark="提交返回SUCCESS,查询未果,等待轮询",
+                    status=self.TaskStatus.PROCESSING,
                 )
         else:
             await self.log_service.log(
                 contents={
-                    "wx_sn": wx_sn,
+                    "source_id": source_id,
                     "task": "create_inner_decode_task",
                     "status": "fail",
                     "message": f"unexpected submit status: {status}",
@@ -329,9 +324,9 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
 
         # 过滤已有任务记录的文章(测试模式跳过)
         if not self._TEST_MODE:
-            all_wx_sns = [a["wx_sn"] for a in articles]
-            existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
-            new_articles = [a for a in articles if a["wx_sn"] not in existing]
+            all_source_ids = [str(a["source_id"]) for a in articles]
+            existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
+            new_articles = [a for a in articles if str(a["source_id"]) not in existing]
             skipped = len(articles) - len(new_articles)
             if skipped > 0:
                 await self.log_service.log(
@@ -344,7 +339,9 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
             for article in articles:
                 if article not in new_articles:
                     await self.mapper.update_inner_article_status(
-                        article["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
+                        article["id"],
+                        self.TaskStatus.PROCESSING,
+                        self.TaskStatus.SUCCESS,
                     )
         else:
             new_articles = articles
@@ -359,7 +356,7 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
             produce_info = await self.mapper.fetch_inner_articles_produce_detail(
                 source_id
             )
-            produce_info_map[article["wx_sn"]] = produce_info
+            produce_info_map[str(article["source_id"])] = produce_info
 
         posts = self.tool.prepare_posts(new_articles, produce_info_map)
 
@@ -369,12 +366,12 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
         posts_by_cid = {p["channelContentId"]: p for p in posts}
 
         for article in tqdm(new_articles):
-            wx_sn = article["wx_sn"]
+            source_id = str(article["source_id"])
             article_id = article["id"]
 
-            result = submit_results.get(wx_sn)
+            result = submit_results.get(source_id)
             await self._handle_result(
-                article, wx_sn, result, posts_by_cid, self.CONFIG_ID
+                article, source_id, result, posts_by_cid, self.CONFIG_ID
             )
 
             if not self._TEST_MODE:

+ 12 - 13
app/domains/llm_tasks/decode_article/fetch_decode_results.py

@@ -19,16 +19,16 @@ class FetchDecodeResults(DecodeArticleConst):
         self.tool = AigcDecodeUtils()
 
     async def _process_batch(self, tasks: List[Dict]):
-        channel_content_ids = [t["channel_content_id"] for t in tasks]
-        results = await self.tool.query_decode_results_batch(channel_content_ids)
+        source_ids = [t["source_id"] for t in tasks]
+        results = await self.tool.query_decode_results_batch(source_ids)
 
         for task in tasks:
-            channel_content_id = task["channel_content_id"]
-            result = results.get(channel_content_id)
+            source_id = task["source_id"]
+            result = results.get(source_id)
 
             if not result:
-                await self.mapper.update_task_status_by_channel(
-                    channel_content_id=channel_content_id,
+                await self.mapper.update_task_status_by_source_id(
+                    source_id=source_id,
                     ori_status=self.TaskStatus.INIT,
                     new_status=self.TaskStatus.FAILED,
                     remark="解构任务在结果查询中未返回,可能不存在",
@@ -36,9 +36,9 @@ class FetchDecodeResults(DecodeArticleConst):
                 await self.log_service.log(
                     contents={
                         "task": "fetch_decode_results_v2",
-                        "channel_content_id": channel_content_id,
+                        "source_id": source_id,
                         "status": "fail",
-                        "message": "channel_content_id not in query response",
+                        "message": "source_id not in query response",
                     }
                 )
                 continue
@@ -51,7 +51,7 @@ class FetchDecodeResults(DecodeArticleConst):
                 data_content = result.get("dataContent") or "{}"
                 html = result.get("html")
                 await self.mapper.set_decode_result(
-                    channel_content_id=channel_content_id,
+                    source_id=source_id,
                     result=json.dumps(
                         {"dataContent": data_content, "html": html},
                         ensure_ascii=False,
@@ -61,8 +61,8 @@ class FetchDecodeResults(DecodeArticleConst):
             elif status in (self.QueryStatus.PENDING, self.QueryStatus.RUNNING):
                 pass
             elif status == self.QueryStatus.FAILED:
-                await self.mapper.update_task_status_by_channel(
-                    channel_content_id=channel_content_id,
+                await self.mapper.update_task_status_by_source_id(
+                    source_id=source_id,
                     ori_status=self.TaskStatus.INIT,
                     new_status=self.TaskStatus.FAILED,
                     remark=f"解构任务失败: {result.get('errorMessage', '')}",
@@ -71,7 +71,7 @@ class FetchDecodeResults(DecodeArticleConst):
                 await self.log_service.log(
                     contents={
                         "task": "fetch_decode_results_v2",
-                        "channel_content_id": channel_content_id,
+                        "source_id": source_id,
                         "status": "unknown",
                         "message": f"unexpected query status: {status}",
                         "data": result,
@@ -80,7 +80,6 @@ class FetchDecodeResults(DecodeArticleConst):
 
     async def deal(self):
         pending_tasks = await self.mapper.fetch_pending_tasks()
-        print(len(pending_tasks))
         if not pending_tasks:
             await self.log_service.log(
                 contents={