Kaynağa Gözat

长文-聚类优化

luojunhui 1 gün önce
ebeveyn
işleme
a49c5803a6

+ 43 - 0
app/domains/decode_task/ad_platform_articles_decode/_mapper.py

@@ -85,3 +85,46 @@ class AdPlatformArticlesDecodeMapper(AdPlatformArticlesDecodeConst):
         return await self.pool.async_fetch(
             query=query, params=(self.INIT_STATUS, self.TASK_BATCH)
         )
+
+    # 获取待解析的任务(获取处理成功的任务)
+    async def fetch_extract_tasks(self):
+        query = """
+            SELECT id, result FROM long_articles_decode_tasks
+            WHERE extract_status = %s AND status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
+        )
+
+    # 修改解析状态(用于加锁与状态流转)
+    async def update_extract_status(self, task_id, ori_status, new_status):
+        query = """
+            UPDATE long_articles_decode_tasks
+            SET extract_status = %s WHERE extract_status = %s AND id = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                new_status,
+                ori_status,
+                task_id,
+            ),
+        )
+
+    # 记录解析结果明细到 long_articles_decode_task_detail
+    async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
+        query = """
+            INSERT INTO long_articles_decode_task_detail
+                (decode_task_id, inspiration, purpose, key_point, topic)
+            VALUES (%s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                decode_task_id,
+                detail.get("inspiration", ""),
+                detail.get("purpose", ""),
+                detail.get("key_point", ""),
+                detail.get("topic", ""),
+            ),
+        )

+ 44 - 2
app/domains/decode_task/ad_platform_articles_decode/_util.py

@@ -22,14 +22,18 @@ class AdPlatformArticlesDecodeUtil(AdPlatformArticlesDecodeConst):
             return []
         if not isinstance(image_list, list):
             return []
-        return [i.get("image_url") for i in image_list if isinstance(i, dict) and i.get("image_url")]
+        return [
+            i.get("image_url")
+            for i in image_list
+            if isinstance(i, dict) and i.get("image_url")
+        ]
 
     async def create_decode_task(self, article: Dict):
         request_body = self.prepare_extract_body(article)
         return await self.decode_server.create_decode_task(request_body)
 
     async def fetch_decode_result(self, task_id: str):
-        return await self.decode_server.fetch_decode_result(task_id)
+        return await self.decode_server.fetch_result(task_id)
 
     def prepare_extract_body(self, article: Dict) -> Dict:
         return {
@@ -45,3 +49,41 @@ class AdPlatformArticlesDecodeUtil(AdPlatformArticlesDecodeConst):
                 "channel_account_name": article.get("account_name", ""),
             },
         }
+
+    @staticmethod
+    def extract_decode_result(result: Dict) -> Dict:
+        """
+        从结构的结果中,解析出灵感点、目的点、关键点;
+        """
+        final_result = result.get("final_normalization_rebuild")
+        if not final_result:
+            return {"error": "解构结果中无 final_normalization_rebuild 信息"}
+        # 灵感点
+        inspiration_list = final_result.get("inspiration_final_result", {}).get(
+            "最终灵感点列表", []
+        )
+        # 目的
+        purpose_list = final_result.get("purpose_final_result", {}).get(
+            "最终目的点列表", []
+        )
+        # 关键点
+        keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
+
+        topic_fusion = final_result.get("topic_fusion_result", {})
+        # 选题
+        topic_text = (
+            topic_fusion.get("最终选题", {}).get("选题", "")
+            if isinstance(topic_fusion.get("最终选题"), dict)
+            else ""
+        )
+
+        def _join_points(items: list, key: str) -> str:
+            parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
+            return ",".join(parts)
+
+        return {
+            "inspiration": _join_points(inspiration_list, "灵感点"),
+            "purpose": _join_points(purpose_list, "目的点"),
+            "key_point": _join_points(keypoint_list, "关键点"),
+            "topic": topic_text,
+        }

+ 94 - 2
app/domains/decode_task/ad_platform_articles_decode/entrance.py

@@ -5,6 +5,8 @@ from tqdm import tqdm
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
 
+from app.infra.shared import run_tasks_with_asyncio_task_group
+
 from ._const import AdPlatformArticlesDecodeConst
 from ._mapper import AdPlatformArticlesDecodeMapper
 from ._util import AdPlatformArticlesDecodeUtil
@@ -52,7 +54,9 @@ class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
             )
             return
 
-        task_id = response.get("data", {}).get("task_id") or response.get("data", {}).get("taskId")
+        task_id = response.get("data", {}).get("task_id") or response.get(
+            "data", {}
+        ).get("taskId")
         if not task_id:
             # 解构任务创建失败
             await self.mapper.update_article_decode_status(
@@ -195,11 +199,87 @@ class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
                     }
                 )
 
+    async def extract_single_result(self, task):
+        task_id = task["id"]
+
+        # acquire lock by extract_status
+        acquire_lock = await self.mapper.update_extract_status(
+            task_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            return
+
+        try:
+            result = json.loads(task["result"])["result"]
+        except (TypeError, KeyError, json.JSONDecodeError) as e:
+            await self.mapper.update_extract_status(
+                task_id,
+                self.PROCESSING_STATUS,
+                self.FAILED_STATUS,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_single_result",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": f"parse decode result error: {e}",
+                    "raw": task.get("result"),
+                }
+            )
+            return
+
+        detail = self.tool.extract_decode_result(result)
+        # 如果工具返回错误信息,直接标记为失败
+        if detail.get("error"):
+            await self.mapper.update_extract_status(
+                task_id,
+                self.PROCESSING_STATUS,
+                self.FAILED_STATUS,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_single_result",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": detail["error"],
+                }
+            )
+            return
+
+        # 写入明细表
+        saved = await self.mapper.record_extract_detail(task_id, detail)
+        if not saved:
+            await self.mapper.update_extract_status(
+                task_id,
+                self.PROCESSING_STATUS,
+                self.FAILED_STATUS,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_single_result",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": "insert long_articles_decode_task_detail failed",
+                    "detail": detail,
+                }
+            )
+            return
+
+        # 写入成功,更新状态为成功
+        await self.mapper.update_extract_status(
+            task_id,
+            self.PROCESSING_STATUS,
+            self.SUCCESS_STATUS,
+        )
+
     async def create_tasks(self):
         article_list = await self.mapper.fetch_decode_articles()
         if not article_list:
             await self.log_service.log(
-                contents={"task": "create_tasks", "message": "No more articles to decode"}
+                contents={
+                    "task": "create_tasks",
+                    "message": "No more articles to decode",
+                }
             )
             return
 
@@ -217,6 +297,15 @@ class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
         for task in decoding_tasks:
             await self.fetch_single_task(task)
 
+    async def extract_task(self):
+        tasks = await self.mapper.fetch_extract_tasks()
+        await run_tasks_with_asyncio_task_group(
+            task_list=tasks,
+            handler=self.extract_single_result,
+            description="批量解析结构结果",
+            unit="task",
+        )
+
     async def deal(self, task_name):
         match task_name:
             case "create_tasks":
@@ -225,5 +314,8 @@ class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
             case "fetch_results":
                 await self.fetch_results()
 
+            case "extract":
+                await self.extract_task()
+
 
 __all__ = ["AdPlatformArticlesDecodeTask"]

+ 1 - 1
app/infra/internal/piaoquan.py

@@ -110,7 +110,7 @@ class DecodeServer:
         return response
 
     # 获取解构结果
-    async def fetch_decode_result(self, task_id: str) -> Dict:
+    async def fetch_result(self, task_id: str) -> Dict:
         """
         INPUT: TaskId
         OUTPUT: Dict