瀏覽代碼

解构任务提交完成

luojunhui 15 小時之前
父節點
當前提交
394191423c

+ 1 - 0
app/domains/decode_task/__init__.py

@@ -0,0 +1 @@
+from .ad_platform_articles_decode import AdPlatformArticlesDecodeTask

+ 1 - 0
app/domains/decode_task/ad_platform_articles_decode/__init__.py

@@ -0,0 +1 @@
+from .entrance import AdPlatformArticlesDecodeTask

+ 8 - 9
app/domains/decode_task/ad_platform_articles_decode/_const.py

@@ -1,11 +1,9 @@
-
-
 class AdPlatformArticlesDecodeConst:
-    # 解构任务状态
-    DECODE_INIT_STATUS = 0
-    DECODE_PROCESSING_STATUS = 1
-    DECODE_SUCCESS_STATUS = 2
-    DECODE_FAILED_STATUS = 99
+    # 任务状态
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
 
     # 解构结果状态
     PENDING = 0
@@ -26,5 +24,6 @@ class AdPlatformArticlesDecodeConst:
     # 返回 code
     SUCCESS_CODE = 0
 
-
-
+    # 获取详情状态
+    FETCH_DETAIL_SUCCESS = 2
+    TASK_BATCH = 1

+ 79 - 1
app/domains/decode_task/ad_platform_articles_decode/_mapper.py

@@ -1,9 +1,87 @@
+from typing import List, Dict
+
 from app.core.database import DatabaseManager
 
+from ._const import AdPlatformArticlesDecodeConst
 
-class AdPlatformArticlesDecodeMapper:
 
+class AdPlatformArticlesDecodeMapper(AdPlatformArticlesDecodeConst):
     def __init__(self, pool: DatabaseManager):
         self.pool = pool
 
+    # 存储解构任务
+    async def record_decode_task(
+        self, task_id: str, wx_sn: str, remark: str = None
+    ) -> int:
+        query = """
+            INSERT INTO long_articles_decode_tasks (task_id, wx_sn, remark)
+            VALUES (%s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=(task_id, wx_sn, remark))
+
+    # 更新解构任务状态
+    async def update_decode_task_status(
+        self, task_id: str, ori_status: int, new_status: int, remark: str = None
+    ) -> int:
+        query = """
+            UPDATE long_articles_decode_tasks
+            SET status = %s, remark = %s
+            WHERE task_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, remark, task_id, ori_status)
+        )
+
+    # 修改文章解构状态
+    async def update_article_decode_status(
+        self, id_: int, ori_status: int, new_status: int
+    ) -> int:
+        query = """
+            UPDATE ad_platform_accounts_daily_detail
+            SET decode_status = %s
+            WHERE id = %s AND decode_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, id_, ori_status)
+        )
+
+    # 设置解构结果
+    async def set_decode_result(
+        self, task_id: str, result: str, remark: str = None
+    ) -> int:
+        query = """
+            UPDATE long_articles_decode_tasks
+            SET status = %s, remark = %s, result = %s
+            WHERE task_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.SUCCESS_STATUS,
+                remark,
+                result,
+                task_id,
+                self.PROCESSING_STATUS,
+            ),
+        )
+
+    # 获取待解构文章
+    async def fetch_decode_articles(self) -> List[Dict]:
+        query = """
+            SELECT id, account_name, gh_id, article_title, article_cover,
+                   article_text, article_images, wx_sn
+            FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s
+            LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.SUCCESS_STATUS, self.INIT_STATUS, self.TASK_BATCH)
+        )
 
+    # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
+    async def fetch_decoding_tasks(self) -> List[Dict]:
+        query = """
+            SELECT task_id FROM long_articles_decode_tasks WHERE status = %s LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.TASK_BATCH)
+        )

+ 34 - 5
app/domains/decode_task/ad_platform_articles_decode/_util.py

@@ -1,18 +1,47 @@
 import json
+from typing import Dict, List
 
 from app.infra.internal import DecodeServer
 
+from ._const import AdPlatformArticlesDecodeConst
 
-class AdPlatformArticlesDecodeUtil:
 
+class AdPlatformArticlesDecodeUtil(AdPlatformArticlesDecodeConst):
     decode_server = DecodeServer()
 
     @staticmethod
-    def format_images(images: str):
+    def format_images(images: str) -> List[str]:
         """
-        格式化图片字符串
+        格式化图片字符串,空/非法 JSON 返回空列表。
         """
-        image_list = json.loads(images)
-        return [i["image_url"] for i in image_list]
+        if not images or not images.strip():
+            return []
+        try:
+            image_list = json.loads(images)
+        except (json.JSONDecodeError, TypeError):
+            return []
+        if not isinstance(image_list, list):
+            return []
+        return [i.get("image_url") for i in image_list if isinstance(i, dict) and i.get("image_url")]
 
+    async def create_decode_task(self, article: Dict):
+        request_body = self.prepare_extract_body(article)
+        return await self.decode_server.create_decode_task(request_body)
 
+    async def fetch_decode_result(self, task_id: str):
+        return await self.decode_server.fetch_decode_result(task_id)
+
+    def prepare_extract_body(self, article: Dict) -> Dict:
+        return {
+            "scene": self.POINT_PICK,
+            "content_type": self.LONG_ARTICLE,
+            "content": {
+                "channel_content_id": article.get("wx_sn", ""),
+                "video_url": "",
+                "images": self.format_images(article.get("article_images") or ""),
+                "body_text": article.get("article_text", ""),
+                "title": article.get("article_title", ""),
+                "channel_account_id": article.get("gh_id", ""),
+                "channel_account_name": article.get("account_name", ""),
+            },
+        }

+ 214 - 1
app/domains/decode_task/ad_platform_articles_decode/entrance.py

@@ -1,3 +1,7 @@
+import json
+from typing import Dict
+from tqdm import tqdm
+
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
 
@@ -7,10 +11,219 @@ from ._util import AdPlatformArticlesDecodeUtil
 
 
 class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
-
     def __init__(self, pool: DatabaseManager, log_service: LogService):
         self.pool = pool
         self.log_service = log_service
         self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
         self.tool = AdPlatformArticlesDecodeUtil()
 
+    async def create_single_decode_task(self, article: Dict):
+        # Acquire Lock
+        article_id = article["id"]
+        acquire_lock = await self.mapper.update_article_decode_status(
+            article_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "skip",
+                    "message": "acquire lock failed",
+                }
+            )
+            return
+
+        # 与解构系统交互,创建解构任务
+        response = await self.tool.create_decode_task(article)
+        response_code = response.get("code")
+        if response_code != self.SUCCESS_CODE:
+            # 解构任务创建失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "fail",
+                    "data": response,
+                }
+            )
+            return
+
+        task_id = response.get("data", {}).get("task_id")
+        if not task_id:
+            # 解构任务创建失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "fail",
+                    "data": response,
+                }
+            )
+            return
+
+        # 创建 decode 任务成功
+        await self.log_service.log(
+            contents={
+                "article_id": article_id,
+                "task": "create_decode_task",
+                "status": "success",
+                "data": response,
+            }
+        )
+
+        wx_sn = article["wx_sn"]
+        remark = f"task_id: {task_id}-创建解构任务"
+        record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
+        if not record_row:
+            # 记录解构任务失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "record_decode_task",
+                    "status": "fail",
+                    "message": "创建 decode 记录失败",
+                    "data": response,
+                }
+            )
+            return
+
+        # 记录创建成功
+        await self.mapper.update_article_decode_status(
+            article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
+        )
+
+    async def fetch_single_task(self, task: Dict):
+        task_id = task["task_id"]
+
+        # acquire lock
+        acquire_lock = await self.mapper.update_decode_task_status(
+            task_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            return
+
+        response = await self.tool.fetch_decode_result(task_id)
+        if not response:
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.INIT_STATUS,
+                remark="获取解构结果失败,服务异常,已回滚状态",
+            )
+            return
+
+        # 请求成功
+        response_code = response.get("code")
+        if response_code != self.SUCCESS_CODE:
+            # 解构任务获取失败
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.FAILED_STATUS,
+                remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
+            )
+            return
+
+        response_data = response.get("data", {})
+        response_task_id = response_data.get("task_id")
+        if task_id != response_task_id:
+            # 解构任务获取失败
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.FAILED_STATUS,
+                remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
+            )
+            return
+
+        status = response_data.get("status")
+        match status:
+            case self.PENDING:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态为PENDING,继续轮询",
+                )
+
+            case self.RUNNING:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态为RUNNING,继续轮询",
+                )
+
+            case self.SUCCESS:
+                await self.mapper.set_decode_result(
+                    task_id=task_id,
+                    result=json.dumps(response_data, ensure_ascii=False),
+                )
+
+            case self.FAILED:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.FAILED_STATUS,
+                    remark=f"解构任务状态为FAILED,标记为失败",
+                )
+
+            case _:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
+                )
+                await self.log_service.log(
+                    contents={
+                        "task": "fetch_single_task",
+                        "task_id": task_id,
+                        "status": "unknown",
+                        "message": f"unexpected decode status: {status}",
+                        "data": response_data,
+                    }
+                )
+
+    async def create_tasks(self):
+        article_list = await self.mapper.fetch_decode_articles()
+        if not article_list:
+            await self.log_service.log(
+                contents={"task": "create_tasks", "message": "No more articles to decode"}
+            )
+            return
+
+        for article in tqdm(article_list, desc="Creating decode tasks"):
+            await self.create_single_decode_task(article)
+
+    async def fetch_results(self):
+        decoding_tasks = await self.mapper.fetch_decoding_tasks()
+        if not decoding_tasks:
+            await self.log_service.log(
+                contents={"task": "fetch_results", "message": "No more tasks to fetch"}
+            )
+            return
+
+        for task in decoding_tasks:
+            await self.fetch_single_task(task)
+
+    async def deal(self, task_name):
+        match task_name:
+            case "create_tasks":
+                await self.create_tasks()
+
+            case "fetch_results":
+                await self.fetch_results()
+
+
+__all__ = ["AdPlatformArticlesDecodeTask"]

+ 20 - 0
app/jobs/task_handler.py

@@ -29,6 +29,8 @@ from app.domains.data_recycle_tasks import (
     UpdateOutsideRootSourceIdAndUpdateTimeTask,
 )
 
+from app.domains.decode_task import AdPlatformArticlesDecodeTask
+
 from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
@@ -462,5 +464,23 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+    @register("create_ad_platform_accounts_decode_task")
+    async def _create_ad_platform_accounts_decode_task(self) -> int:
+        """创建解构任务"""
+        task = AdPlatformArticlesDecodeTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(task_name="create_tasks")
+        return TaskStatus.SUCCESS
+
+    @register("fetch_ad_platform_accounts_decode_result")
+    async def _fetch_ad_platform_accounts_decode_result(self) -> int:
+        """获取解构任务结果"""
+        task = AdPlatformArticlesDecodeTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(task_name="fetch_results")
+        return TaskStatus.SUCCESS
+
 
 __all__ = ["TaskHandler"]