Prechádzať zdrojové kódy

Merge branch 'feature/luojunhui/20260306-extract-gzh-articles-by-AI' of Server/LongArticleTaskServer into master

luojunhui 17 hodín pred
rodič
commit
9eadb46e7b

+ 1 - 0
app/domains/decode_task/__init__.py

@@ -0,0 +1 @@
+from .ad_platform_articles_decode import AdPlatformArticlesDecodeTask

+ 1 - 0
app/domains/decode_task/ad_platform_articles_decode/__init__.py

@@ -0,0 +1 @@
+from .entrance import AdPlatformArticlesDecodeTask

+ 29 - 0
app/domains/decode_task/ad_platform_articles_decode/_const.py

@@ -0,0 +1,29 @@
+class AdPlatformArticlesDecodeConst:
+    # 任务状态
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+
+    # 解构结果状态
+    PENDING = 0
+    RUNNING = 1
+    SUCCESS = 2
+    FAILED = 3
+
+    # 业务场景
+    POINT_PICK = 0
+    CREATE = 1
+    MAKE = 2
+
+    # 内容类型
+    LONG_ARTICLE = 1
+    PICTURE_TEXT = 2
+    VIDEO = 3
+
+    # 返回 code
+    SUCCESS_CODE = 0
+
+    # 获取详情状态
+    FETCH_DETAIL_SUCCESS = 2
+    TASK_BATCH = 1

+ 87 - 0
app/domains/decode_task/ad_platform_articles_decode/_mapper.py

@@ -0,0 +1,87 @@
+from typing import List, Dict
+
+from app.core.database import DatabaseManager
+
+from ._const import AdPlatformArticlesDecodeConst
+
+
+class AdPlatformArticlesDecodeMapper(AdPlatformArticlesDecodeConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    # 存储解构任务
+    async def record_decode_task(
+        self, task_id: str, wx_sn: str, remark: str = None
+    ) -> int:
+        query = """
+            INSERT INTO long_articles_decode_tasks (task_id, wx_sn, remark)
+            VALUES (%s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=(task_id, wx_sn, remark))
+
+    # 更新解构任务状态
+    async def update_decode_task_status(
+        self, task_id: str, ori_status: int, new_status: int, remark: str = None
+    ) -> int:
+        query = """
+            UPDATE long_articles_decode_tasks
+            SET status = %s, remark = %s
+            WHERE task_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, remark, task_id, ori_status)
+        )
+
+    # 修改文章解构状态
+    async def update_article_decode_status(
+        self, id_: int, ori_status: int, new_status: int
+    ) -> int:
+        query = """
+            UPDATE ad_platform_accounts_daily_detail
+            SET decode_status = %s
+            WHERE id = %s AND decode_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, id_, ori_status)
+        )
+
+    # 设置解构结果
+    async def set_decode_result(
+        self, task_id: str, result: str, remark: str = None
+    ) -> int:
+        query = """
+            UPDATE long_articles_decode_tasks
+            SET status = %s, remark = %s, result = %s
+            WHERE task_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.SUCCESS_STATUS,
+                remark,
+                result,
+                task_id,
+                self.PROCESSING_STATUS,
+            ),
+        )
+
+    # 获取待解构文章
+    async def fetch_decode_articles(self) -> List[Dict]:
+        query = """
+            SELECT id, account_name, gh_id, article_title, article_cover,
+                   article_text, article_images, wx_sn
+            FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s
+            LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.SUCCESS_STATUS, self.INIT_STATUS, self.TASK_BATCH)
+        )
+
+    # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
+    async def fetch_decoding_tasks(self) -> List[Dict]:
+        query = """
+            SELECT task_id FROM long_articles_decode_tasks WHERE status = %s LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.TASK_BATCH)
+        )

+ 47 - 0
app/domains/decode_task/ad_platform_articles_decode/_util.py

@@ -0,0 +1,47 @@
+import json
+from typing import Dict, List
+
+from app.infra.internal import DecodeServer
+
+from ._const import AdPlatformArticlesDecodeConst
+
+
+class AdPlatformArticlesDecodeUtil(AdPlatformArticlesDecodeConst):
+    decode_server = DecodeServer()
+
+    @staticmethod
+    def format_images(images: str) -> List[str]:
+        """
+        格式化图片字符串,空/非法 JSON 返回空列表。
+        """
+        if not images or not images.strip():
+            return []
+        try:
+            image_list = json.loads(images)
+        except (json.JSONDecodeError, TypeError):
+            return []
+        if not isinstance(image_list, list):
+            return []
+        return [i.get("image_url") for i in image_list if isinstance(i, dict) and i.get("image_url")]
+
+    async def create_decode_task(self, article: Dict):
+        request_body = self.prepare_extract_body(article)
+        return await self.decode_server.create_decode_task(request_body)
+
+    async def fetch_decode_result(self, task_id: str):
+        return await self.decode_server.fetch_decode_result(task_id)
+
+    def prepare_extract_body(self, article: Dict) -> Dict:
+        return {
+            "scene": self.POINT_PICK,
+            "content_type": self.LONG_ARTICLE,
+            "content": {
+                "channel_content_id": article.get("wx_sn", ""),
+                "video_url": "",
+                "images": self.format_images(article.get("article_images") or ""),
+                "body_text": article.get("article_text", ""),
+                "title": article.get("article_title", ""),
+                "channel_account_id": article.get("gh_id", ""),
+                "channel_account_name": article.get("account_name", ""),
+            },
+        }

+ 229 - 0
app/domains/decode_task/ad_platform_articles_decode/entrance.py

@@ -0,0 +1,229 @@
+import json
+from typing import Dict
+from tqdm import tqdm
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import AdPlatformArticlesDecodeConst
+from ._mapper import AdPlatformArticlesDecodeMapper
+from ._util import AdPlatformArticlesDecodeUtil
+
+
+class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
+        self.tool = AdPlatformArticlesDecodeUtil()
+
+    async def create_single_decode_task(self, article: Dict):
+        # Acquire Lock
+        article_id = article["id"]
+        acquire_lock = await self.mapper.update_article_decode_status(
+            article_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "skip",
+                    "message": "acquire lock failed",
+                }
+            )
+            return
+
+        # 与解构系统交互,创建解构任务
+        response = await self.tool.create_decode_task(article)
+        response_code = response.get("code")
+        if response_code != self.SUCCESS_CODE:
+            # 解构任务创建失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "fail",
+                    "data": response,
+                }
+            )
+            return
+
+        task_id = response.get("data", {}).get("task_id") or response.get("data", {}).get("taskId")
+        if not task_id:
+            # 解构任务创建失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "create_decode_task",
+                    "status": "fail",
+                    "data": response,
+                }
+            )
+            return
+
+        # 创建 decode 任务成功
+        await self.log_service.log(
+            contents={
+                "article_id": article_id,
+                "task": "create_decode_task",
+                "status": "success",
+                "data": response,
+            }
+        )
+
+        wx_sn = article["wx_sn"]
+        remark = f"task_id: {task_id}-创建解构任务"
+        record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
+        if not record_row:
+            # 记录解构任务失败
+            await self.mapper.update_article_decode_status(
+                article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+            await self.log_service.log(
+                contents={
+                    "article_id": article_id,
+                    "task": "record_decode_task",
+                    "status": "fail",
+                    "message": "创建 decode 记录失败",
+                    "data": response,
+                }
+            )
+            return
+
+        # 记录创建成功
+        await self.mapper.update_article_decode_status(
+            article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
+        )
+
+    async def fetch_single_task(self, task: Dict):
+        task_id = task["task_id"]
+
+        # acquire lock
+        acquire_lock = await self.mapper.update_decode_task_status(
+            task_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            return
+
+        response = await self.tool.fetch_decode_result(task_id)
+        if not response:
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.INIT_STATUS,
+                remark="获取解构结果失败,服务异常,已回滚状态",
+            )
+            return
+
+        # 请求成功
+        response_code = response.get("code")
+        if response_code != self.SUCCESS_CODE:
+            # 解构任务获取失败
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.FAILED_STATUS,
+                remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
+            )
+            return
+
+        response_data = response.get("data", {})
+        response_task_id = response_data.get("taskId") or response_data.get("task_id")
+        if task_id != response_task_id:
+            # 解构任务获取失败
+            await self.mapper.update_decode_task_status(
+                task_id=task_id,
+                ori_status=self.PROCESSING_STATUS,
+                new_status=self.FAILED_STATUS,
+                remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
+            )
+            return
+
+        status = response_data.get("status")
+        match status:
+            case self.PENDING:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态为PENDING,继续轮询",
+                )
+
+            case self.RUNNING:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态为RUNNING,继续轮询",
+                )
+
+            case self.SUCCESS:
+                await self.mapper.set_decode_result(
+                    task_id=task_id,
+                    result=json.dumps(response_data, ensure_ascii=False),
+                )
+
+            case self.FAILED:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.FAILED_STATUS,
+                    remark=f"解构任务状态为FAILED,标记为失败",
+                )
+
+            case _:
+                await self.mapper.update_decode_task_status(
+                    task_id=task_id,
+                    ori_status=self.PROCESSING_STATUS,
+                    new_status=self.INIT_STATUS,
+                    remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
+                )
+                await self.log_service.log(
+                    contents={
+                        "task": "fetch_single_task",
+                        "task_id": task_id,
+                        "status": "unknown",
+                        "message": f"unexpected decode status: {status}",
+                        "data": response_data,
+                    }
+                )
+
+    async def create_tasks(self):
+        article_list = await self.mapper.fetch_decode_articles()
+        if not article_list:
+            await self.log_service.log(
+                contents={"task": "create_tasks", "message": "No more articles to decode"}
+            )
+            return
+
+        for article in tqdm(article_list, desc="Creating decode tasks"):
+            await self.create_single_decode_task(article)
+
+    async def fetch_results(self):
+        decoding_tasks = await self.mapper.fetch_decoding_tasks()
+        if not decoding_tasks:
+            await self.log_service.log(
+                contents={"task": "fetch_results", "message": "No more tasks to fetch"}
+            )
+            return
+
+        for task in decoding_tasks:
+            await self.fetch_single_task(task)
+
+    async def deal(self, task_name):
+        match task_name:
+            case "create_tasks":
+                await self.create_tasks()
+
+            case "fetch_results":
+                await self.fetch_results()
+
+
+__all__ = ["AdPlatformArticlesDecodeTask"]

+ 2 - 0
app/infra/internal/__init__.py

@@ -2,6 +2,7 @@
 from .piaoquan import change_video_audit_status
 from .piaoquan import publish_video_to_piaoquan
 from .piaoquan import fetch_piaoquan_video_list_detail
+from .piaoquan import DecodeServer
 
 # aigc system api
 from .aigc_system import delete_illegal_gzh_articles
@@ -25,4 +26,5 @@ __all__ = [
     "get_titles_from_produce_plan",
     "get_top_article_title_list",
     "get_hot_titles",
+    "DecodeServer",
 ]

+ 65 - 0
app/infra/internal/piaoquan.py

@@ -76,3 +76,68 @@ async def publish_video_to_piaoquan(oss_path: str, uid: str, title: str) -> Dict
         response = await client.post(url, data=payload, headers=headers)
 
     return response
+
+
+class DecodeServer:
+
+    base_url: str = "http://supply-content-deconstruction-api.piaoquantv.com"
+
+    # 创建解构任务
+    async def create_decode_task(self, data: Dict) -> Dict:
+        """
+        scene: 业务场景:0 选题; 1 创作; 2 制作;
+        content_type: 内容类型: 1 长文; 2 图文; 3 视频;
+        content:
+            channel_content_id:
+            video_url:
+            images: [ image_url_1, image_url_2, ...]
+            body_text: 长文内容
+            title: 文章标题
+            channel_account_id: 作者 id
+            channel_account_name: 作者名称
+        output_type: {
+
+        }
+        """
+        url = f"{self.base_url}/api/v1/content/tasks/decode"
+        headers = {
+            "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+            "Content-Type": "application/json",
+        }
+        async with AsyncHttpClient() as client:
+            response = await client.post(url, json=data, headers=headers)
+
+        return response
+
+    # 获取解构结果
+    async def fetch_decode_result(self, task_id: str) -> Dict:
+        """
+        INPUT: TaskId
+        OUTPUT: Dict
+        {
+          code: 0 | 404(task not exist),
+          msg: 'ok',
+          data: {
+              "taskId": "123",
+              "status": 3, 0 PENDING, 1 RUNNING, 2 SUCCESS, 3 FAILED
+              "result": None | JSON,
+              "reason": "" | 失败原因,
+              "url": {
+                  "pointUrl": "", 选题点结构结果页地址(仅解构任务有
+                  "weightUrl": "", 权重页地址(解构聚类都有)
+                  "patternUrl": "" 选题点聚类结果页地址(仅聚类任务有)
+              }
+          },
+        }
+        """
+        url = f"{self.base_url}/api/v1/content/tasks/{task_id}"
+        headers = {
+            "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+            "Content-Type": "application/json",
+        }
+        async with AsyncHttpClient() as client:
+            response = await client.get(url, headers=headers)
+
+        return response
+
+

+ 20 - 0
app/jobs/task_handler.py

@@ -29,6 +29,8 @@ from app.domains.data_recycle_tasks import (
     UpdateOutsideRootSourceIdAndUpdateTimeTask,
 )
 
+from app.domains.decode_task import AdPlatformArticlesDecodeTask
+
 from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
@@ -462,5 +464,23 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+    @register("create_ad_platform_accounts_decode_task")
+    async def _create_ad_platform_accounts_decode_task(self) -> int:
+        """创建解构任务"""
+        task = AdPlatformArticlesDecodeTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(task_name="create_tasks")
+        return TaskStatus.SUCCESS
+
+    @register("fetch_ad_platform_accounts_decode_result")
+    async def _fetch_ad_platform_accounts_decode_result(self) -> int:
+        """获取解构任务结果"""
+        task = AdPlatformArticlesDecodeTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(task_name="fetch_results")
+        return TaskStatus.SUCCESS
+
 
 __all__ = ["TaskHandler"]