Преглед на файлове

长文解构接入 aigc 系统

luojunhui преди 5 дни
родител
ревизия
6a7660cd8a

+ 0 - 0
app/__init__.py


+ 13 - 0
app/domains/llm_tasks/decode_article/__init__.py

@@ -0,0 +1,13 @@
+from .create_decode_tasks import (
+    CreateAdPlatformArticlesDecodeTask,
+    CreateInnerArticlesDecodeTask,
+)
+from .fetch_decode_results import FetchDecodeResults
+from .extract_decode_task_detail import ExtractDecodeTaskDetail
+
+__all__ = [
+    "CreateAdPlatformArticlesDecodeTask",
+    "CreateInnerArticlesDecodeTask",
+    "FetchDecodeResults",
+    "ExtractDecodeTaskDetail",
+]

+ 52 - 0
app/domains/llm_tasks/decode_article/_const.py

@@ -0,0 +1,52 @@
+class DecodeArticleConst:
+    CONFIG_ID = 61  # 长文头条-文章解构
+    TASK_BATCH = 200  # 每批处理数
+    SUBMIT_BATCH = 50  # 提交 API 每批帖子上限
+
+    class TaskStatus:
+        INIT = 0
+        PROCESSING = 1
+        SUCCESS = 2
+        FAILED = 99
+
+    class ExtractStatus(TaskStatus): ...
+
+    class SubmitStatus:
+        SUCCESS = "SUCCESS"
+        PENDING = "PENDING"
+        FAILED = "FAILED"
+
+    class QueryStatus:
+        SUCCESS = "SUCCESS"
+        PENDING = "PENDING"
+        RUNNING = "RUNNING"
+        FAILED = "FAILED"
+
+    class SourceType:
+        AD_PLATFORM = 1
+        INNER = 2
+
+    class ContentModal:
+        PICTURE_TEXT = 2  # 图文
+        LONG_ARTICLE = 3  # 长文
+        VIDEO = 4  # 视频
+        AUDIO = 5  # 音频
+
+    class Channel:
+        XIAOHONGSHU = 1  # 小红书
+        DOUYIN = 2  # 抖音
+        PINTEREST = 3  # Pinterest
+        REDDIT = 4  # Reddit
+        WECHAT = 5  # 微信公众号
+        TOUTIAO = 6  # 头条号
+        PIAOQUAN = 10  # 票圈
+
+    class ProduceModuleType:
+        COVER = 1  # 封面
+        IMAGE = 2  # 图片
+        TITLE = 3  # 标题
+        CONTENT = 4  # 正文
+        SUMMARY = 18  # 摘要
+
+
+__all__ = ["DecodeArticleConst"]

+ 230 - 0
app/domains/llm_tasks/decode_article/_mapper.py

@@ -0,0 +1,230 @@
+from typing import Dict, List
+
+from app.core.database import DatabaseManager
+
+from ._const import DecodeArticleConst
+
+TABLE = "long_articles_decode_tasks_v2"
+
+
+class ArticlesDecodeTaskMapper(DecodeArticleConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def insert_decode_task(
+        self,
+        channel_content_id: str,
+        content_id: str,
+        source: int,
+        payload: str,
+        remark: str = None,
+    ) -> int:
+        query = f"""
+            INSERT IGNORE INTO {TABLE}
+                (channel_content_id, config_id, content_id, source, payload, remark)
+            VALUES (%s, %s, %s, %s, %s, %s)
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                channel_content_id,
+                self.CONFIG_ID,
+                content_id,
+                source,
+                payload,
+                remark,
+            ),
+        )
+
+    async def update_task_status_by_channel(
+        self,
+        channel_content_id: str,
+        ori_status: int,
+        new_status: int,
+        remark: str = None,
+    ) -> int:
+        query = f"""
+            UPDATE {TABLE}
+            SET status = %s, remark = %s
+            WHERE channel_content_id = %s AND status = %s AND config_id = %s
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(new_status, remark, channel_content_id, ori_status, self.CONFIG_ID),
+        )
+
+    async def set_decode_result(
+        self,
+        channel_content_id: str,
+        result: str,
+        remark: str = None,
+    ) -> int:
+        query = f"""
+            UPDATE {TABLE}
+            SET status = %s, result = %s, remark = %s
+            WHERE channel_content_id = %s AND status IN (%s, %s) AND config_id = %s
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.TaskStatus.SUCCESS,
+                result,
+                remark,
+                channel_content_id,
+                self.TaskStatus.INIT,
+                self.TaskStatus.PROCESSING,
+                self.CONFIG_ID,
+            ),
+        )
+
+    async def fetch_pending_tasks(
+        self, source: int = None
+    ) -> List[Dict]:
+        if source is not None:
+            query = f"""
+                SELECT channel_content_id, content_id
+                FROM {TABLE}
+                WHERE status = %s AND source = %s AND config_id = %s
+                LIMIT %s
+            """
+            params = (self.TaskStatus.INIT, source, self.CONFIG_ID, self.TASK_BATCH)
+        else:
+            query = f"""
+                SELECT channel_content_id, content_id
+                FROM {TABLE}
+                WHERE status = %s AND config_id = %s
+                LIMIT %s
+            """
+            params = (self.TaskStatus.INIT, self.CONFIG_ID, self.TASK_BATCH)
+        return await self.pool.async_fetch(query=query, params=params)
+
+    async def fetch_existing_channel_content_ids(
+        self, channel_content_ids: List[str]
+    ) -> set:
+        """批量查询哪些 channel_content_id 已有任务记录"""
+        if not channel_content_ids:
+            return set()
+        placeholders = ",".join(["%s"] * len(channel_content_ids))
+        query = f"""
+            SELECT channel_content_id FROM {TABLE}
+            WHERE channel_content_id IN ({placeholders}) AND config_id = %s
+        """
+        rows = await self.pool.async_fetch(
+            query=query,
+            params=(*channel_content_ids, self.CONFIG_ID),
+        )
+        return {r["channel_content_id"] for r in rows}
+
+    async def fetch_extract_tasks(self) -> List[Dict]:
+        query = f"""
+            SELECT id, result FROM {TABLE}
+            WHERE extract_status = %s AND status = %s AND config_id = %s
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
+        )
+
+    async def update_extract_status(
+        self, task_id: int, ori_status: int, new_status: int
+    ) -> int:
+        query = f"""
+            UPDATE {TABLE}
+            SET extract_status = %s
+            WHERE extract_status = %s AND id = %s
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, ori_status, task_id)
+        )
+
+    async def record_extract_detail(
+        self, decode_task_id: int, detail: Dict
+    ) -> int:
+        query = """
+            INSERT INTO long_articles_decode_task_detail_v2
+                (decode_task_id, inspiration, purpose, key_point, topic)
+            VALUES (%s, %s, %s, %s, %s)
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                decode_task_id,
+                detail.get("inspiration", ""),
+                detail.get("purpose", ""),
+                detail.get("key_point", ""),
+                detail.get("topic", ""),
+            ),
+        )
+
+
+class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
+    def __init__(self, pool: DatabaseManager):
+        super().__init__(pool)
+
+    async def update_article_decode_status(
+        self, id_: int, ori_status: int, new_status: int
+    ) -> int:
+        query = """
+            UPDATE ad_platform_accounts_daily_detail
+            SET decode_status = %s
+            WHERE id = %s AND decode_status = %s
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, id_, ori_status)
+        )
+
+    async def fetch_decode_articles(self) -> List[Dict]:
+        query = """
+            SELECT id, account_name, gh_id, article_title, article_cover,
+                   article_text, article_images, wx_sn
+            FROM ad_platform_accounts_daily_detail
+            WHERE fetch_status = %s AND decode_status = %s
+            LIMIT %s
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
+        )
+
+
+class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
+    def __init__(self, pool: DatabaseManager):
+        super().__init__(pool)
+
+    async def fetch_inner_articles(self) -> List[Dict]:
+        query = """
+            SELECT  title
+                  ,SUM(fans) AS total_fans
+                  ,SUM(view_count) AS total_view
+                  ,SUM(view_count) / SUM(fans) AS avg_read_rate
+                  ,SUM(first_level) AS total_first_level
+                  ,MAX(source_id) as source_id
+                  ,MAX(wx_sn) as wx_sn
+            FROM    datastat_sort_strategy
+            WHERE  date_str >= '20250101'
+            GROUP BY title
+            HAVING total_fans > 100000
+            AND     avg_read_rate > 0.002
+            AND     total_first_level > 0
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def fetch_inner_articles_produce_detail(
+        self, source_id
+    ) -> List[Dict]:
+        query = """
+            SELECT produce_module_type, output
+            FROM produce_plan_module_output
+            WHERE plan_exe_id = %s
+            AND produce_module_type in (1,2,3,4,18)
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="aigc", params=(source_id,)
+        )
+
+
+__all__ = [
+    "ArticlesDecodeTaskMapper",
+    "AdPlatformArticlesDecodeTaskMapper",
+    "InnerArticlesDecodeTaskMapper",
+]

+ 168 - 0
app/domains/llm_tasks/decode_article/_utils.py

@@ -0,0 +1,168 @@
+import json
+from typing import Dict, List
+
+from app.infra.internal.aigc_decode_server import AigcDecodeServer
+
+from ._const import DecodeArticleConst
+
+
+class AigcDecodeUtils(DecodeArticleConst):
+    decode_server = AigcDecodeServer()
+
+    async def submit_decode_batch(
+        self, posts: List[Dict]
+    ) -> Dict[str, Dict]:
+        """分批提交解构任务,返回 {channel_content_id: {status, errorMessage}}"""
+        result = {}
+        for i in range(0, len(posts), self.SUBMIT_BATCH):
+            batch = posts[i : i + self.SUBMIT_BATCH]
+            response = await self.decode_server.submit_decode(
+                config_id=self.CONFIG_ID, posts=batch
+            )
+            if response.get("code") == 0:
+                for item in response.get("data", []):
+                    result[item["channelContentId"]] = item
+            else:
+                # 整批失败,标记所有帖子为 FAILED
+                for post in batch:
+                    cid = post["channelContentId"]
+                    result[cid] = {
+                        "channelContentId": cid,
+                        "status": "FAILED",
+                        "errorMessage": f"batch submit failed: {response}",
+                    }
+        return result
+
+    async def query_decode_results_batch(
+        self, channel_content_ids: List[str]
+    ) -> Dict[str, Dict]:
+        """分批查询解构结果,返回 {channel_content_id: {status, dataContent, html, errorMessage}}
+        当 API 调用失败时,对应条目 status 为 API_ERROR,调用方应保持 INIT 等待重试。
+        """
+        result = {}
+        for i in range(0, len(channel_content_ids), self.SUBMIT_BATCH):
+            batch = channel_content_ids[i : i + self.SUBMIT_BATCH]
+            response = await self.decode_server.query_decode_results(
+                config_id=self.CONFIG_ID, channel_content_ids=batch
+            )
+            if response.get("code") == 0:
+                for item in response.get("data", []):
+                    result[item["channelContentId"]] = item
+            else:
+                for cid in batch:
+                    result[cid] = {
+                        "channelContentId": cid,
+                        "status": "API_ERROR",
+                        "errorMessage": f"query API failed: {response}",
+                    }
+        return result
+
+    @staticmethod
+    def extract_decode_result(result: Dict) -> Dict:
+        """从解构结果中解析出灵感点、目的点、关键点、选题
+        兼容新旧两种数据格式:v1 有 final_normalization_rebuild 包裹层,v2 无
+        """
+        final_result = result.get("final_normalization_rebuild") or result
+
+        inspiration_list = final_result.get("inspiration_final_result", {}).get(
+            "最终灵感点列表", []
+        )
+        purpose_list = final_result.get("purpose_final_result", {}).get(
+            "最终目的点列表", []
+        )
+        keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
+
+        topic_fusion = final_result.get("topic_fusion_result", {})
+        topic_text = (
+            topic_fusion.get("最终选题", {}).get("选题", "")
+            if isinstance(topic_fusion.get("最终选题"), dict)
+            else ""
+        )
+
+        def _join_points(items: list, key: str) -> str:
+            parts = [
+                str(p[key]) for p in items if isinstance(p, dict) and p.get(key)
+            ]
+            return ",".join(parts)
+
+        return {
+            "inspiration": _join_points(inspiration_list, "灵感点"),
+            "purpose": _join_points(purpose_list, "目的点"),
+            "key_point": _join_points(keypoint_list, "关键点"),
+            "topic": topic_text,
+        }
+
+
+class AdPlatformArticlesDecodeUtils(AigcDecodeUtils):
+    @staticmethod
+    def format_images(images: str) -> List[str]:
+        if not images or not images.strip():
+            return []
+        try:
+            image_list = json.loads(images)
+        except (json.JSONDecodeError, TypeError):
+            return []
+        if not isinstance(image_list, list):
+            return []
+        return [
+            i.get("image_url")
+            for i in image_list
+            if isinstance(i, dict) and i.get("image_url")
+        ]
+
+    def prepare_posts(self, articles: List[Dict]) -> List[Dict]:
+        posts = []
+        for article in articles:
+            images = self.format_images(article.get("article_images") or "")
+            posts.append(
+                {
+                    "channelContentId": article["wx_sn"],
+                    "title": article.get("article_title", ""),
+                    "bodyText": article.get("article_text", ""),
+                    "images": images,
+                    "video": None,
+                    "contentModal": self.ContentModal.LONG_ARTICLE,
+                    "channel": self.Channel.WECHAT,
+                }
+            )
+        return posts
+
+
+class InnerArticlesDecodeUtils(AigcDecodeUtils):
+    def prepare_posts(
+        self, articles: List[Dict], produce_info_map: Dict[str, List[Dict]]
+    ) -> List[Dict]:
+        posts = []
+        for article in articles:
+            wx_sn = article["wx_sn"]
+            produce_info = produce_info_map.get(wx_sn, [])
+            images = [
+                i["output"]
+                for i in produce_info
+                if i["produce_module_type"]
+                in (self.ProduceModuleType.COVER, self.ProduceModuleType.IMAGE)
+            ]
+            text_parts = [
+                i["output"]
+                for i in produce_info
+                if i["produce_module_type"] == self.ProduceModuleType.CONTENT
+            ]
+            posts.append(
+                {
+                    "channelContentId": wx_sn,
+                    "title": article.get("title", ""),
+                    "bodyText": "\n".join(text_parts),
+                    "images": images,
+                    "video": None,
+                    "contentModal": self.ContentModal.LONG_ARTICLE,
+                    "channel": self.Channel.WECHAT,
+                }
+            )
+        return posts
+
+
+__all__ = [
+    "AigcDecodeUtils",
+    "AdPlatformArticlesDecodeUtils",
+    "InnerArticlesDecodeUtils",
+]

+ 343 - 0
app/domains/llm_tasks/decode_article/create_decode_tasks.py

@@ -0,0 +1,343 @@
+import json
+
+from tqdm import tqdm
+from typing import Dict, List
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import DecodeArticleConst
+from ._mapper import (
+    AdPlatformArticlesDecodeTaskMapper,
+    InnerArticlesDecodeTaskMapper,
+)
+from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
+
+
+class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
+        self.tool = AdPlatformArticlesDecodeUtils()
+
+    async def _acquire_articles(self) -> List[Dict]:
+        """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
+        article_list = await self.mapper.fetch_decode_articles()
+        locked = []
+        for article in article_list:
+            article_id = article["id"]
+            acquired = await self.mapper.update_article_decode_status(
+                article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
+            )
+            if acquired:
+                locked.append(article)
+            else:
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "task": "create_decode_task_v2",
+                        "status": "skip",
+                        "message": "acquire lock failed",
+                    }
+                )
+        return locked
+
+    async def _submit_and_record(self, articles: List[Dict]):
+        if not articles:
+            return
+
+        posts = self.tool.prepare_posts(articles)
+        submit_results = await self.tool.submit_decode_batch(posts)
+        posts_by_wx = {p["channelContentId"]: p for p in posts}
+
+        for article in articles:
+            wx_sn = article["wx_sn"]
+            article_id = article["id"]
+            result = submit_results.get(wx_sn)
+
+            if not result:
+                await self.mapper.update_article_decode_status(
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                )
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "wx_sn": wx_sn,
+                        "task": "create_decode_task_v2",
+                        "status": "fail",
+                        "message": "no response for channel_content_id",
+                    }
+                )
+                continue
+
+            status = result.get("status")
+            if status == self.SubmitStatus.FAILED:
+                await self.mapper.update_article_decode_status(
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                )
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "wx_sn": wx_sn,
+                        "task": "create_decode_task_v2",
+                        "status": "fail",
+                        "data": result,
+                    }
+                )
+                continue
+
+            if status == self.SubmitStatus.SUCCESS:
+                # 已有解构结果,直接查询结果并落库
+                query_results = await self.tool.query_decode_results_batch([wx_sn])
+                result_data = query_results.get(wx_sn)
+                if result_data and result_data.get("status") == self.QueryStatus.SUCCESS:
+                    data_content = result_data.get("dataContent") or "{}"
+                    html = result_data.get("html")
+                    await self.mapper.insert_decode_task(
+                        channel_content_id=wx_sn,
+                        content_id=article_id,
+                        source=self.SourceType.AD_PLATFORM,
+                        payload=json.dumps(
+                            posts_by_wx.get(wx_sn, {}), ensure_ascii=False
+                        ),
+                        remark="提交时已有解构结果,直接落库",
+                    )
+                    await self.mapper.set_decode_result(
+                        channel_content_id=wx_sn,
+                        result=json.dumps(
+                            {"dataContent": data_content, "html": html},
+                            ensure_ascii=False,
+                        ),
+                        remark="提交时已返回 SUCCESS,结果已落库",
+                    )
+                    await self.mapper.update_article_decode_status(
+                        article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
+                    )
+                    await self.log_service.log(
+                        contents={
+                            "article_id": article_id,
+                            "wx_sn": wx_sn,
+                            "task": "create_decode_task_v2",
+                            "status": "success",
+                            "message": "decode result already available on submit",
+                        }
+                    )
+                else:
+                    # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
+                    await self.mapper.insert_decode_task(
+                        channel_content_id=wx_sn,
+                        content_id=article_id,
+                        source=self.SourceType.AD_PLATFORM,
+                        payload=json.dumps(
+                            posts_by_wx.get(wx_sn, {}), ensure_ascii=False
+                        ),
+                        remark="提交返回SUCCESS,查询未果,等待轮询",
+                    )
+                    await self.mapper.update_article_decode_status(
+                        article_id,
+                        self.TaskStatus.PROCESSING,
+                        self.TaskStatus.SUCCESS,
+                    )
+                    await self.log_service.log(
+                        contents={
+                            "article_id": article_id,
+                            "wx_sn": wx_sn,
+                            "task": "create_decode_task_v2",
+                            "status": "pending",
+                            "message": "submit SUCCESS but query not ready, inserted for polling",
+                        }
+                    )
+            elif status == self.SubmitStatus.PENDING:
+                await self.mapper.insert_decode_task(
+                    channel_content_id=wx_sn,
+                    content_id=article_id,
+                    source=self.SourceType.AD_PLATFORM,
+                    payload=json.dumps(
+                        posts_by_wx.get(wx_sn, {}), ensure_ascii=False
+                    ),
+                    remark="任务已提交,等待轮询",
+                )
+                await self.mapper.update_article_decode_status(
+                    article_id,
+                    self.TaskStatus.PROCESSING,
+                    self.TaskStatus.SUCCESS,
+                )
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "wx_sn": wx_sn,
+                        "task": "create_decode_task_v2",
+                        "status": "pending",
+                        "message": "task submitted, waiting for polling",
+                    }
+                )
+            else:
+                await self.mapper.update_article_decode_status(
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                )
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "wx_sn": wx_sn,
+                        "task": "create_decode_task_v2",
+                        "status": "fail",
+                        "message": f"unexpected submit status: {status}",
+                        "data": result,
+                    }
+                )
+
+    async def deal(self):
+        article_list = await self._acquire_articles()
+        if not article_list:
+            await self.log_service.log(
+                contents={
+                    "task": "create_decode_task_v2",
+                    "message": "No more articles to decode",
+                }
+            )
+            return
+
+        await self._submit_and_record(article_list)
+        await self.log_service.log(
+            contents={
+                "task": "create_decode_task_v2",
+                "message": f"Processed {len(article_list)} articles",
+            }
+        )
+
+
+class CreateInnerArticlesDecodeTask(DecodeArticleConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
+        self.tool = InnerArticlesDecodeUtils()
+
+    async def deal(self):
+        article_list = await self.mapper.fetch_inner_articles()
+        if not article_list:
+            await self.log_service.log(
+                contents={
+                    "task": "create_inner_decode_task_v2",
+                    "message": "No more articles to decode",
+                }
+            )
+            return
+
+        # 过滤已有任务记录的文章
+        all_wx_sns = [a["wx_sn"] for a in article_list]
+        existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
+        new_articles = [a for a in article_list if a["wx_sn"] not in existing]
+        skipped = len(article_list) - len(new_articles)
+        if skipped > 0:
+            await self.log_service.log(
+                contents={
+                    "task": "create_inner_decode_task_v2",
+                    "message": f"Skipped {skipped} already-submitted articles",
+                }
+            )
+        if not new_articles:
+            await self.log_service.log(
+                contents={
+                    "task": "create_inner_decode_task_v2",
+                    "message": "All articles already submitted",
+                }
+            )
+            return
+
+        # 批量获取 produce 信息
+        produce_info_map: Dict[str, list] = {}
+        for article in new_articles:
+            source_id = article["source_id"]
+            produce_info = await self.mapper.fetch_inner_articles_produce_detail(
+                source_id
+            )
+            produce_info_map[article["wx_sn"]] = produce_info
+
+        posts = self.tool.prepare_posts(new_articles, produce_info_map)
+        submit_results = await self.tool.submit_decode_batch(posts)
+        posts_by_wx = {p["channelContentId"]: p for p in posts}
+
+        for article in tqdm(new_articles):
+            wx_sn = article["wx_sn"]
+            result = submit_results.get(wx_sn)
+            if not result:
+                await self.log_service.log(
+                    contents={
+                        "wx_sn": wx_sn,
+                        "task": "create_inner_decode_task_v2",
+                        "status": "fail",
+                        "message": "no response for channel_content_id",
+                    }
+                )
+                continue
+
+            status = result.get("status")
+            if status == self.SubmitStatus.FAILED:
+                await self.log_service.log(
+                    contents={
+                        "wx_sn": wx_sn,
+                        "task": "create_inner_decode_task_v2",
+                        "status": "fail",
+                        "data": result,
+                    }
+                )
+            elif status == self.SubmitStatus.PENDING:
+                await self.mapper.insert_decode_task(
+                    channel_content_id=wx_sn,
+                    content_id=str(article.get("source_id", "")),
+                    source=self.SourceType.INNER,
+                    payload=json.dumps(
+                        posts_by_wx.get(wx_sn, {}), ensure_ascii=False
+                    ),
+                    remark="内部文章解构任务已提交",
+                )
+            elif status == self.SubmitStatus.SUCCESS:
+                query_results = await self.tool.query_decode_results_batch([wx_sn])
+                result_data = query_results.get(wx_sn)
+                data_content = result_data.get("dataContent") if result_data else None
+                if data_content:
+                    await self.mapper.insert_decode_task(
+                        channel_content_id=wx_sn,
+                        content_id=str(article.get("source_id", "")),
+                        source=self.SourceType.INNER,
+                        payload=json.dumps(
+                            posts_by_wx.get(wx_sn, {}), ensure_ascii=False
+                        ),
+                        remark="内部文章解构结果已获取",
+                    )
+                    await self.mapper.set_decode_result(
+                        channel_content_id=wx_sn,
+                        result=json.dumps(
+                            {"dataContent": data_content}, ensure_ascii=False
+                        ),
+                    )
+                else:
+                    await self.mapper.insert_decode_task(
+                        channel_content_id=wx_sn,
+                        content_id=str(article.get("source_id", "")),
+                        source=self.SourceType.INNER,
+                        payload=json.dumps(result, ensure_ascii=False),
+                        remark="提交返回SUCCESS,查询未果,等待轮询",
+                    )
+            else:
+                await self.log_service.log(
+                    contents={
+                        "wx_sn": wx_sn,
+                        "task": "create_inner_decode_task_v2",
+                        "status": "fail",
+                        "message": f"unexpected submit status: {status}",
+                        "data": result,
+                    }
+                )
+
+        await self.log_service.log(
+            contents={
+                "task": "create_inner_decode_task_v2",
+                "message": f"Processed {len(new_articles)} articles",
+            }
+        )
+
+
+__all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]

+ 115 - 0
app/domains/llm_tasks/decode_article/extract_decode_task_detail.py

@@ -0,0 +1,115 @@
+import json
+from typing import Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.shared import run_tasks_with_asyncio_task_group
+
+from ._const import DecodeArticleConst
+from ._mapper import ArticlesDecodeTaskMapper
+from ._utils import AigcDecodeUtils
+
+
+class ExtractDecodeTaskDetail(DecodeArticleConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = ArticlesDecodeTaskMapper(self.pool)
+        self.tool = AigcDecodeUtils()
+
+    async def extract_single_result(self, task: Dict):
+        task_id = task["id"]
+
+        acquire_lock = await self.mapper.update_extract_status(
+            task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
+        )
+        if not acquire_lock:
+            return
+
+        try:
+            raw_result = json.loads(task["result"])
+            # 新 API 结果格式: {"dataContent": "{...}", "html": "..."}
+            data_content = raw_result.get("dataContent")
+            if isinstance(data_content, str):
+                inner_result = json.loads(data_content)
+            else:
+                inner_result = data_content or {}
+        except (TypeError, KeyError, json.JSONDecodeError) as e:
+            await self.mapper.update_extract_status(
+                task_id,
+                self.ExtractStatus.PROCESSING,
+                self.ExtractStatus.FAILED,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_decode_result_v2",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": f"parse decode result error: {e}",
+                    "raw": task.get("result"),
+                }
+            )
+            return
+
+        detail = self.tool.extract_decode_result(inner_result)
+        if detail.get("error"):
+            await self.mapper.update_extract_status(
+                task_id,
+                self.ExtractStatus.PROCESSING,
+                self.ExtractStatus.FAILED,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_decode_result_v2",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": detail["error"],
+                }
+            )
+            return
+
+        saved = await self.mapper.record_extract_detail(task_id, detail)
+        if not saved:
+            await self.mapper.update_extract_status(
+                task_id,
+                self.ExtractStatus.PROCESSING,
+                self.ExtractStatus.FAILED,
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "extract_decode_result_v2",
+                    "task_id": task_id,
+                    "status": "fail",
+                    "message": "insert long_articles_decode_task_detail failed",
+                    "detail": detail,
+                }
+            )
+            return
+
+        await self.mapper.update_extract_status(
+            task_id,
+            self.ExtractStatus.PROCESSING,
+            self.ExtractStatus.SUCCESS,
+        )
+
+    async def deal(self):
+        tasks = await self.mapper.fetch_extract_tasks()
+        if not tasks:
+            await self.log_service.log(
+                contents={
+                    "task": "extract_decode_result_v2",
+                    "message": "No more tasks to extract",
+                }
+            )
+            return
+
+        await run_tasks_with_asyncio_task_group(
+            task_list=tasks,
+            handler=self.extract_single_result,
+            description="批量解析解构结果",
+            unit="task",
+        )
+
+
+__all__ = ["ExtractDecodeTaskDetail"]

+ 113 - 0
app/domains/llm_tasks/decode_article/fetch_decode_results.py

@@ -0,0 +1,113 @@
+import json
+from typing import List, Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.shared import run_tasks_with_asyncio_task_group
+
+from ._const import DecodeArticleConst
+from ._mapper import ArticlesDecodeTaskMapper
+from ._utils import AigcDecodeUtils
+
+
+class FetchDecodeResults(DecodeArticleConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = ArticlesDecodeTaskMapper(self.pool)
+        self.tool = AigcDecodeUtils()
+
+    async def _process_batch(self, tasks: List[Dict]):
+        channel_content_ids = [t["channel_content_id"] for t in tasks]
+        results = await self.tool.query_decode_results_batch(channel_content_ids)
+
+        for task in tasks:
+            channel_content_id = task["channel_content_id"]
+            result = results.get(channel_content_id)
+
+            if not result:
+                await self.mapper.update_task_status_by_channel(
+                    channel_content_id=channel_content_id,
+                    ori_status=self.TaskStatus.INIT,
+                    new_status=self.TaskStatus.FAILED,
+                    remark="解构任务在结果查询中未返回,可能不存在",
+                )
+                await self.log_service.log(
+                    contents={
+                        "task": "fetch_decode_results_v2",
+                        "channel_content_id": channel_content_id,
+                        "status": "fail",
+                        "message": "channel_content_id not in query response",
+                    }
+                )
+                continue
+
+            status = result.get("status")
+            if status == "API_ERROR":
+                # 查询 API 调用失败,保持 INIT 等待重试
+                continue
+            elif status == self.QueryStatus.SUCCESS:
+                data_content = result.get("dataContent") or "{}"
+                html = result.get("html")
+                await self.mapper.set_decode_result(
+                    channel_content_id=channel_content_id,
+                    result=json.dumps(
+                        {"dataContent": data_content, "html": html},
+                        ensure_ascii=False,
+                    ),
+                    remark="解构结果获取成功",
+                )
+            elif status in (self.QueryStatus.PENDING, self.QueryStatus.RUNNING):
+                pass
+            elif status == self.QueryStatus.FAILED:
+                await self.mapper.update_task_status_by_channel(
+                    channel_content_id=channel_content_id,
+                    ori_status=self.TaskStatus.INIT,
+                    new_status=self.TaskStatus.FAILED,
+                    remark=f"解构任务失败: {result.get('errorMessage', '')}",
+                )
+            else:
+                await self.log_service.log(
+                    contents={
+                        "task": "fetch_decode_results_v2",
+                        "channel_content_id": channel_content_id,
+                        "status": "unknown",
+                        "message": f"unexpected query status: {status}",
+                        "data": result,
+                    }
+                )
+
+    async def deal(self):
+        pending_tasks = await self.mapper.fetch_pending_tasks()
+        print(len(pending_tasks))
+        if not pending_tasks:
+            await self.log_service.log(
+                contents={
+                    "task": "fetch_decode_results_v2",
+                    "message": "No more tasks to fetch",
+                }
+            )
+            return
+
+        # 拆成多个批次,并发查询
+        batches = [
+            pending_tasks[i : i + self.SUBMIT_BATCH]
+            for i in range(0, len(pending_tasks), self.SUBMIT_BATCH)
+        ]
+        await run_tasks_with_asyncio_task_group(
+            task_list=batches,
+            handler=self._process_batch,
+            description="批量查询解构结果",
+            unit="batch",
+        )
+
+        await self.log_service.log(
+            contents={
+                "task": "fetch_decode_results_v2",
+                "message": f"Processed {len(pending_tasks)} pending tasks in {len(batches)} batches",
+            }
+        )
+
+
+__all__ = ["FetchDecodeResults"]

+ 2 - 0
app/infra/internal/__init__.py

@@ -3,6 +3,7 @@ from .piaoquan import change_video_audit_status
 from .piaoquan import publish_video_to_piaoquan
 from .piaoquan import fetch_piaoquan_video_list_detail
 from .piaoquan_decode_server import DecodeServer
+from .aigc_decode_server import AigcDecodeServer
 
 # aigc system api
 from .aigc_system import delete_illegal_gzh_articles
@@ -28,5 +29,6 @@ __all__ = [
     "get_top_article_title_list",
     "get_hot_titles",
     "DecodeServer",
+    "AigcDecodeServer",
     "insert_crawler_plan",
 ]

+ 16 - 0
app/jobs/domains/llm_task.py

@@ -2,6 +2,18 @@ from app.domains.llm_tasks.aigc_decode_task import CreateAdPlatformArticlesDecod
 from app.domains.llm_tasks.aigc_decode_task import CreateInnerArticlesDecodeTask
 from app.domains.llm_tasks.aigc_decode_task import FetchDecodeResults
 from app.domains.llm_tasks.aigc_decode_task import ExtractDecodeTaskDetail
+from app.domains.llm_tasks.decode_article import (
+    CreateAdPlatformArticlesDecodeTask as CreateAdPlatformArticlesDecodeTaskV2,
+)
+from app.domains.llm_tasks.decode_article import (
+    CreateInnerArticlesDecodeTask as CreateInnerArticlesDecodeTaskV2,
+)
+from app.domains.llm_tasks.decode_article import (
+    FetchDecodeResults as FetchDecodeResultsV2,
+)
+from app.domains.llm_tasks.decode_article import (
+    ExtractDecodeTaskDetail as ExtractDecodeTaskDetailV2,
+)
 from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
@@ -13,6 +25,10 @@ __all__ = [
     "CreateInnerArticlesDecodeTask",
     "FetchDecodeResults",
     "ExtractDecodeTaskDetail",
+    "CreateAdPlatformArticlesDecodeTaskV2",
+    "CreateInnerArticlesDecodeTaskV2",
+    "FetchDecodeResultsV2",
+    "ExtractDecodeTaskDetailV2",
     "TitleRewrite",
     "ArticlePoolCategoryGeneration",
     "CandidateAccountQualityScoreRecognizer",

+ 34 - 0
app/jobs/task_handler.py

@@ -467,6 +467,40 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+    # ====================== V2 解构任务(新 AIGC API)======================
+
+    @register("create_ad_platform_accounts_decode_task_v2")
+    async def _create_decode_task_v2(self) -> int:
+        """创建解构任务(v2 - 新 AIGC API)"""
+        task = CreateAdPlatformArticlesDecodeTaskV2(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal()
+        return TaskStatus.SUCCESS
+
+    @register("create_inner_articles_decode_task_v2")
+    async def _create_inner_decode_task_v2(self) -> int:
+        """创建内部文章解构任务(v2 - 新 AIGC API)"""
+        task = CreateInnerArticlesDecodeTaskV2(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal()
+        return TaskStatus.SUCCESS
+
+    @register("fetch_decode_result_v2")
+    async def _fetch_decode_result_v2(self) -> int:
+        """获取解构任务结果(v2 - 新 AIGC API)"""
+        task = FetchDecodeResultsV2(pool=self.db_client, log_service=self.log_client)
+        await task.deal()
+        return TaskStatus.SUCCESS
+
+    @register("extract_decode_result_v2")
+    async def _extract_decode_result_v2(self) -> int:
+        """提取解构任务结果(v2 - 新 AIGC API)"""
+        task = ExtractDecodeTaskDetailV2(pool=self.db_client, log_service=self.log_client)
+        await task.deal()
+        return TaskStatus.SUCCESS
+
     # ====================== Recommend Tasks=====================
     @register("i2i_recommend_data_sync")
     async def _i2i_recommend_data_sync_handler(self) -> int: