Selaa lähdekoodia

aigc-文章解构优化

luojunhui 2 päivää sitten
vanhempi
commit
cc66b553ab

+ 1 - 1
README.md

@@ -141,7 +141,7 @@ docker compose up -d
 │   │   │   └── recycle_outside_account_articles.py
 │   │   ├── llm_tasks
 │   │   │   ├── __init__.py
-│   │   │   ├── aigc_decode_task
+│   │   │   ├── decode_article
 │   │   │   │   ├── __init__.py
 │   │   │   │   ├── _const.py
 │   │   │   │   ├── _mapper.py

+ 0 - 13
app/domains/llm_tasks/aigc_decode_task/__init__.py

@@ -1,13 +0,0 @@
-from .create_decode_tasks import (
-    CreateAdPlatformArticlesDecodeTask,
-    CreateInnerArticlesDecodeTask,
-)
-from .fetch_decode_results import FetchDecodeResults
-from .extract_decode_task_detail import ExtractDecodeTaskDetail
-
-__all__ = [
-    "CreateAdPlatformArticlesDecodeTask",
-    "CreateInnerArticlesDecodeTask",
-    "FetchDecodeResults",
-    "ExtractDecodeTaskDetail",
-]

+ 0 - 47
app/domains/llm_tasks/aigc_decode_task/_const.py

@@ -1,47 +0,0 @@
-class DecodeTaskConst:
-    TASK_BATCH = 100
-
-    class TaskStatus:
-        # 任务状态
-        INIT = 0
-        PROCESSING = 1
-        SUCCESS = 2
-        FAILED = 99
-
-    class ExtractStatus(TaskStatus): ...
-
-    class DecodeStatus:
-        # 解构结果状态
-        PENDING = 0
-        RUNNING = 1
-        SUCCESS = 2
-        FAILED = 3
-
-    class BusinessScene:
-        # 业务场景
-        POINT_PICK = 0
-        CREATE = 1
-        MAKE = 2
-
-    class ContentType:
-        # 内容类型
-        LONG_ARTICLE = 1
-        PICTURE_TEXT = 2
-        VIDEO = 3
-
-    class SourceType:
-        AD_PLATFORM = 1
-        INNER = 2
-
-    class ProduceModuleType:
-        COVER = 1
-        IMAGE = 2
-        TITLE = 3
-        CONTENT = 4
-        SUMMARY = 18
-
-    class RequestDecode:
-        SUCCESS = 0
-
-
-__all__ = ["DecodeTaskConst"]

+ 0 - 179
app/domains/llm_tasks/aigc_decode_task/_mapper.py

@@ -1,179 +0,0 @@
-from typing import List, Dict
-
-from app.core.database import DatabaseManager
-
-from ._const import DecodeTaskConst
-
-
-class ArticlesDecodeTaskMapper(DecodeTaskConst):
-    def __init__(self, pool: DatabaseManager):
-        self.pool = pool
-
-    # 存储解构任务
-    async def record_decode_task(
-        self, task_id: str, wx_sn: str, remark: str = None
-    ) -> int:
-        query = """
-            INSERT INTO long_articles_decode_tasks (task_id, wx_sn, remark)
-            VALUES (%s, %s, %s)
-        """
-        return await self.pool.async_save(query=query, params=(task_id, wx_sn, remark))
-
-    # 更新解构任务状态
-    async def update_decode_task_status(
-        self, task_id: str, ori_status: int, new_status: int, remark: str = None
-    ) -> int:
-        query = """
-            UPDATE long_articles_decode_tasks
-            SET status = %s, remark = %s
-            WHERE task_id = %s AND status = %s;
-        """
-        return await self.pool.async_save(
-            query=query, params=(new_status, remark, task_id, ori_status)
-        )
-
-    # 设置解构结果
-    async def set_decode_result(
-        self, task_id: str, result: str, remark: str = None
-    ) -> int:
-        query = """
-            UPDATE long_articles_decode_tasks
-            SET status = %s, remark = %s, result = %s
-            WHERE task_id = %s AND status = %s;
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(
-                self.TaskStatus.SUCCESS,
-                remark,
-                result,
-                task_id,
-                self.TaskStatus.PROCESSING,
-            ),
-        )
-
-    # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
-    async def fetch_decoding_tasks(self) -> List[Dict]:
-        query = """
-            SELECT task_id FROM long_articles_decode_tasks WHERE status = %s LIMIT %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
-        )
-
-    # 获取待解析的任务(获取处理成功的任务)
-    async def fetch_extract_tasks(self):
-        query = """
-            SELECT id, result FROM long_articles_decode_tasks
-            WHERE extract_status = %s AND status = %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS)
-        )
-
-    # 修改解析状态(用于加锁与状态流转)
-    async def update_extract_status(self, task_id, ori_status, new_status):
-        query = """
-            UPDATE long_articles_decode_tasks
-            SET extract_status = %s WHERE extract_status = %s AND id = %s;
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(
-                new_status,
-                ori_status,
-                task_id,
-            ),
-        )
-
-    # 记录解析结果明细到 long_articles_decode_task_detail
-    async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
-        query = """
-            INSERT INTO long_articles_decode_task_detail
-                (decode_task_id, inspiration, purpose, key_point, topic)
-            VALUES (%s, %s, %s, %s, %s);
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(
-                decode_task_id,
-                detail.get("inspiration", ""),
-                detail.get("purpose", ""),
-                detail.get("key_point", ""),
-                detail.get("topic", ""),
-            ),
-        )
-
-
-class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
-    def __init__(self, pool: DatabaseManager):
-        super().__init__(pool)
-
-    # 修改文章解构状态
-    async def update_article_decode_status(
-        self, id_: int, ori_status: int, new_status: int
-    ) -> int:
-        query = """
-            UPDATE ad_platform_accounts_daily_detail
-            SET decode_status = %s
-            WHERE id = %s AND decode_status = %s;
-        """
-        return await self.pool.async_save(
-            query=query, params=(new_status, id_, ori_status)
-        )
-
-    # 获取待解构文章
-    async def fetch_decode_articles(self) -> List[Dict]:
-        query = """
-            SELECT id, account_name, gh_id, article_title, article_cover,
-                   article_text, article_images, wx_sn
-            FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s
-            LIMIT %s;
-        """
-        return await self.pool.async_fetch(
-            query=query,
-            params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
-        )
-
-
-class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
-    def __init__(self, pool: DatabaseManager):
-        super().__init__(pool)
-
-    # 获取内部文章
-    async def fetch_inner_articles(self):
-        query = """
-            SELECT  title
-                  ,SUM(fans) AS total_fans
-                  ,SUM(view_count) AS total_view
-                  ,SUM(view_count) / SUM(fans) AS avg_read_rate
-                  ,SUM(first_level) AS total_first_level
-                  ,MAX(source_id) as source_id
-                  ,MAX(wx_sn) as wx_sn
-            FROM    datastat_sort_strategy
-            WHERE  date_str >= '20250101'
-            GROUP BY title
-            HAVING total_fans > 100000
-            AND     avg_read_rate > 0.008
-            AND     total_first_level > 0;
-
-        """
-        return await self.pool.async_fetch(query=query)
-
-    # 获取内部文章生成信息
-    async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
-        query = """
-            SELECT produce_module_type, output
-            FROM produce_plan_module_output WHERE plan_exe_id = %s
-            AND produce_module_type in (1,2,3,4,18); 
-        """
-        return await self.pool.async_fetch(
-            query=query, db_name="aigc", params=(source_id,)
-        )
-
-
-__all__ = [
-    "ArticlesDecodeTaskMapper",
-    "AdPlatformArticlesDecodeTaskMapper",
-    "InnerArticlesDecodeTaskMapper",
-]

+ 0 - 119
app/domains/llm_tasks/aigc_decode_task/_utils.py

@@ -1,119 +0,0 @@
-import json
-from typing import Dict, List
-
-from app.infra.internal import DecodeServer
-
-from ._const import DecodeTaskConst
-
-
-class DecodeTaskUtil(DecodeTaskConst):
-    decode_server = DecodeServer()
-
-    def prepare_extract_body(self, article: Dict) -> Dict:
-        return {
-            "scene": self.BusinessScene.POINT_PICK,
-            "content_type": self.ContentType.LONG_ARTICLE,
-            "content": {
-                "channel_content_id": article.get("wx_sn", ""),
-                "video_url": "",
-                "images": article.get("article_images"),
-                "body_text": article.get("article_text", ""),
-                "title": article.get("article_title", ""),
-                "channel_account_id": article.get("gh_id", ""),
-                "channel_account_name": article.get("account_name", ""),
-            },
-        }
-
-    @staticmethod
-    def extract_decode_result(result: Dict) -> Dict:
-        """
-        从结构的结果中,解析出灵感点、目的点、关键点;
-        """
-        final_result = result.get("final_normalization_rebuild")
-        if not final_result:
-            return {"error": "解构结果中无 final_normalization_rebuild 信息"}
-        # 灵感点
-        inspiration_list = final_result.get("inspiration_final_result", {}).get(
-            "最终灵感点列表", []
-        )
-        # 目的
-        purpose_list = final_result.get("purpose_final_result", {}).get(
-            "最终目的点列表", []
-        )
-        # 关键点
-        keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
-
-        topic_fusion = final_result.get("topic_fusion_result", {})
-        # 选题
-        topic_text = (
-            topic_fusion.get("最终选题", {}).get("选题", "")
-            if isinstance(topic_fusion.get("最终选题"), dict)
-            else ""
-        )
-
-        def _join_points(items: list, key: str) -> str:
-            parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
-            return ",".join(parts)
-
-        return {
-            "inspiration": _join_points(inspiration_list, "灵感点"),
-            "purpose": _join_points(purpose_list, "目的点"),
-            "key_point": _join_points(keypoint_list, "关键点"),
-            "topic": topic_text,
-        }
-
-    async def fetch_decode_result(self, task_id: str):
-        return await self.decode_server.fetch_result(task_id)
-
-
-class AdPlatformArticlesDecodeUtils(DecodeTaskUtil):
-    @staticmethod
-    def format_images(images: str) -> List[str]:
-        """
-        格式化图片字符串,空/非法 JSON 返回空列表。
-        """
-        if not images or not images.strip():
-            return []
-        try:
-            image_list = json.loads(images)
-        except (json.JSONDecodeError, TypeError):
-            return []
-        if not isinstance(image_list, list):
-            return []
-        return [
-            i.get("image_url")
-            for i in image_list
-            if isinstance(i, dict) and i.get("image_url")
-        ]
-
-    async def create_decode_task(self, article: Dict):
-        images = self.format_images(article.get("article_images") or "")
-        article["article_images"] = images
-        request_body = self.prepare_extract_body(article)
-        return await self.decode_server.create_decode_task(request_body)
-
-
-class InnerArticlesDecodeUtils(DecodeTaskUtil):
-    async def create_decode_task(self, article: Dict, article_produce_info: List[Dict]):
-        images = [
-            i["output"]
-            for i in article_produce_info
-            if i["produce_module_type"]
-            in (self.ProduceModuleType.COVER, self.ProduceModuleType.IMAGE)
-        ]
-        article["article_images"] = images
-        text = [
-            i["output"]
-            for i in article_produce_info
-            if i["produce_module_type"] == self.ProduceModuleType.CONTENT
-        ]
-        article["article_text"] = "\n".join(text)
-        request_body = self.prepare_extract_body(article)
-        return await self.decode_server.create_decode_task(request_body)
-
-
-__all__ = [
-    "AdPlatformArticlesDecodeUtils",
-    "InnerArticlesDecodeUtils",
-    "DecodeTaskUtil",
-]

+ 0 - 174
app/domains/llm_tasks/aigc_decode_task/create_decode_tasks.py

@@ -1,174 +0,0 @@
-from typing import Dict
-from tqdm import tqdm
-
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-from ._const import DecodeTaskConst
-from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
-from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
-
-
-class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
-    def __init__(self, pool: DatabaseManager, log_service: LogService):
-        self.pool = pool
-        self.log_service = log_service
-        self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
-        self.tool = AdPlatformArticlesDecodeUtils()
-
-    async def create_single_decode_task(self, article: Dict):
-        # Acquire Lock
-        article_id = article["id"]
-        acquire_lock = await self.mapper.update_article_decode_status(
-            article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
-        )
-        if not acquire_lock:
-            await self.log_service.log(
-                contents={
-                    "article_id": article_id,
-                    "task": "create_decode_task",
-                    "status": "skip",
-                    "message": "acquire lock failed",
-                }
-            )
-            return
-
-        # 与解构系统交互,创建解构任务
-        response = await self.tool.create_decode_task(article)
-        response_code = response.get("code")
-        if response_code != self.RequestDecode.SUCCESS:
-            # 解构任务创建失败
-            await self.mapper.update_article_decode_status(
-                article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
-            )
-            await self.log_service.log(
-                contents={
-                    "article_id": article_id,
-                    "task": "create_decode_task",
-                    "status": "fail",
-                    "data": response,
-                }
-            )
-            return
-
-        task_id = response.get("data", {}).get("task_id") or response.get(
-            "data", {}
-        ).get("taskId")
-        if not task_id:
-            # 解构任务创建失败
-            await self.mapper.update_article_decode_status(
-                article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
-            )
-            await self.log_service.log(
-                contents={
-                    "article_id": article_id,
-                    "task": "create_decode_task",
-                    "status": "fail",
-                    "data": response,
-                }
-            )
-            return
-
-        # 创建 decode 任务成功
-        await self.log_service.log(
-            contents={
-                "article_id": article_id,
-                "task": "create_decode_task",
-                "status": "success",
-                "data": response,
-            }
-        )
-
-        wx_sn = article["wx_sn"]
-        remark = f"task_id: {task_id}-创建解构任务"
-        record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
-        if not record_row:
-            # 记录解构任务失败
-            await self.mapper.update_article_decode_status(
-                article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
-            )
-            await self.log_service.log(
-                contents={
-                    "article_id": article_id,
-                    "task": "record_decode_task",
-                    "status": "fail",
-                    "message": "创建 decode 记录失败",
-                    "data": response,
-                }
-            )
-            return
-
-        # 记录创建成功
-        await self.mapper.update_article_decode_status(
-            article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
-        )
-
-    async def create_tasks(self):
-        article_list = await self.mapper.fetch_decode_articles()
-        if not article_list:
-            await self.log_service.log(
-                contents={
-                    "task": "create_tasks",
-                    "message": "No more articles to decode",
-                }
-            )
-            return
-
-        for article in tqdm(article_list, desc="Creating decode tasks"):
-            await self.create_single_decode_task(article)
-
-    async def deal(self):
-        await self.create_tasks()
-
-
-class CreateInnerArticlesDecodeTask(DecodeTaskConst):
-    def __init__(self, pool: DatabaseManager, log_service: LogService):
-        self.pool = pool
-        self.log_service = log_service
-        self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
-        self.tool = InnerArticlesDecodeUtils()
-
-    async def create_single_decode_task(self, article: Dict):
-        # Acquire Lock
-        source_id = article["source_id"]
-        article_produce_info = await self.mapper.fetch_inner_articles_produce_detail(
-            source_id
-        )
-
-        # 与解构系统交互,创建解构任务
-        response = await self.tool.create_decode_task(article, article_produce_info)
-        response_code = response.get("code")
-        if response_code != self.RequestDecode.SUCCESS:
-            return
-
-        task_id = response.get("data", {}).get("task_id") or response.get(
-            "data", {}
-        ).get("taskId")
-        if not task_id:
-            return
-
-        wx_sn = article["wx_sn"]
-        remark = f"task_id: {task_id}-创建解构任务"
-        record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
-        if not record_row:
-            return
-
-    async def create_tasks(self):
-        article_list = await self.mapper.fetch_inner_articles()
-        if not article_list:
-            await self.log_service.log(
-                contents={
-                    "task": "create_tasks",
-                    "message": "No more articles to decode",
-                }
-            )
-            return
-
-        for article in tqdm(article_list, desc="Creating decode tasks"):
-            await self.create_single_decode_task(article)
-
-    async def deal(self):
-        await self.create_tasks()
-
-
-__all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]

+ 0 - 106
app/domains/llm_tasks/aigc_decode_task/extract_decode_task_detail.py

@@ -1,106 +0,0 @@
-import json
-
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-from app.infra.shared import run_tasks_with_asyncio_task_group
-
-from ._const import DecodeTaskConst
-from ._mapper import ArticlesDecodeTaskMapper
-from ._utils import DecodeTaskUtil
-
-
-class ExtractDecodeTaskDetail(DecodeTaskConst):
-    def __init__(self, pool: DatabaseManager, log_service: LogService):
-        self.pool = pool
-        self.log_service = log_service
-        self.mapper = ArticlesDecodeTaskMapper(self.pool)
-        self.tool = DecodeTaskUtil()
-
-    async def extract_single_result(self, task):
-        task_id = task["id"]
-
-        # acquire lock by extract_status
-        acquire_lock = await self.mapper.update_extract_status(
-            task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
-        )
-        if not acquire_lock:
-            return
-
-        try:
-            result = json.loads(task["result"])["result"]
-        except (TypeError, KeyError, json.JSONDecodeError) as e:
-            await self.mapper.update_extract_status(
-                task_id,
-                self.ExtractStatus.PROCESSING,
-                self.ExtractStatus.FAILED,
-            )
-            await self.log_service.log(
-                contents={
-                    "task": "extract_single_result",
-                    "task_id": task_id,
-                    "status": "fail",
-                    "message": f"parse decode result error: {e}",
-                    "raw": task.get("result"),
-                }
-            )
-            return
-
-        detail = self.tool.extract_decode_result(result)
-        # 如果工具返回错误信息,直接标记为失败
-        if detail.get("error"):
-            await self.mapper.update_extract_status(
-                task_id,
-                self.ExtractStatus.PROCESSING,
-                self.ExtractStatus.FAILED,
-            )
-            await self.log_service.log(
-                contents={
-                    "task": "extract_single_result",
-                    "task_id": task_id,
-                    "status": "fail",
-                    "message": detail["error"],
-                }
-            )
-            return
-
-        # 写入明细表
-        saved = await self.mapper.record_extract_detail(task_id, detail)
-        if not saved:
-            await self.mapper.update_extract_status(
-                task_id,
-                self.ExtractStatus.PROCESSING,
-                self.ExtractStatus.FAILED,
-            )
-            await self.log_service.log(
-                contents={
-                    "task": "extract_single_result",
-                    "task_id": task_id,
-                    "status": "fail",
-                    "message": "insert long_articles_decode_task_detail failed",
-                    "detail": detail,
-                }
-            )
-            return
-
-        # 写入成功,更新状态为成功
-        await self.mapper.update_extract_status(
-            task_id,
-            self.ExtractStatus.PROCESSING,
-            self.ExtractStatus.SUCCESS,
-        )
-
-    async def extract_task(self):
-        tasks = await self.mapper.fetch_extract_tasks()
-        await run_tasks_with_asyncio_task_group(
-            task_list=tasks,
-            handler=self.extract_single_result,
-            description="批量解析结构结果",
-            unit="task",
-        )
-
-    async def deal(self):
-        await self.extract_task()
-
-
-__all__ = ["ExtractDecodeTaskDetail"]

+ 0 - 127
app/domains/llm_tasks/aigc_decode_task/fetch_decode_results.py

@@ -1,127 +0,0 @@
-import json
-from typing import Dict
-
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-from ._const import DecodeTaskConst
-from ._mapper import ArticlesDecodeTaskMapper
-from ._utils import DecodeTaskUtil
-
-
-class FetchDecodeResults(DecodeTaskConst):
-    def __init__(self, pool: DatabaseManager, log_service: LogService):
-        self.pool = pool
-        self.log_service = log_service
-        self.mapper = ArticlesDecodeTaskMapper(self.pool)
-        self.tool = DecodeTaskUtil()
-
-    async def fetch_single_task(self, task: Dict):
-        task_id = task["task_id"]
-
-        # acquire lock
-        acquire_lock = await self.mapper.update_decode_task_status(
-            task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
-        )
-        if not acquire_lock:
-            return
-
-        response = await self.tool.fetch_decode_result(task_id)
-        if not response:
-            await self.mapper.update_decode_task_status(
-                task_id=task_id,
-                ori_status=self.TaskStatus.PROCESSING,
-                new_status=self.TaskStatus.INIT,
-                remark="获取解构结果失败,服务异常,已回滚状态",
-            )
-            return
-
-        # 请求成功
-        response_code = response.get("code")
-        if response_code != self.RequestDecode.SUCCESS:
-            # 解构任务获取失败
-            await self.mapper.update_decode_task_status(
-                task_id=task_id,
-                ori_status=self.TaskStatus.PROCESSING,
-                new_status=self.TaskStatus.FAILED,
-                remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
-            )
-            return
-
-        response_data = response.get("data", {})
-        response_task_id = response_data.get("taskId") or response_data.get("task_id")
-        if task_id != response_task_id:
-            # 解构任务获取失败
-            await self.mapper.update_decode_task_status(
-                task_id=task_id,
-                ori_status=self.TaskStatus.PROCESSING,
-                new_status=self.TaskStatus.FAILED,
-                remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
-            )
-            return
-
-        status = response_data.get("status")
-        match status:
-            case self.DecodeStatus.PENDING:
-                await self.mapper.update_decode_task_status(
-                    task_id=task_id,
-                    ori_status=self.TaskStatus.PROCESSING,
-                    new_status=self.TaskStatus.INIT,
-                    remark=f"解构任务状态为PENDING,继续轮询",
-                )
-
-            case self.DecodeStatus.RUNNING:
-                await self.mapper.update_decode_task_status(
-                    task_id=task_id,
-                    ori_status=self.TaskStatus.PROCESSING,
-                    new_status=self.TaskStatus.INIT,
-                    remark=f"解构任务状态为RUNNING,继续轮询",
-                )
-
-            case self.DecodeStatus.SUCCESS:
-                await self.mapper.set_decode_result(
-                    task_id=task_id,
-                    result=json.dumps(response_data, ensure_ascii=False),
-                )
-
-            case self.DecodeStatus.FAILED:
-                await self.mapper.update_decode_task_status(
-                    task_id=task_id,
-                    ori_status=self.TaskStatus.PROCESSING,
-                    new_status=self.TaskStatus.FAILED,
-                    remark=f"解构任务状态为FAILED,标记为失败",
-                )
-
-            case _:
-                await self.mapper.update_decode_task_status(
-                    task_id=task_id,
-                    ori_status=self.TaskStatus.PROCESSING,
-                    new_status=self.TaskStatus.INIT,
-                    remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
-                )
-                await self.log_service.log(
-                    contents={
-                        "task": "fetch_single_task",
-                        "task_id": task_id,
-                        "status": "unknown",
-                        "message": f"unexpected decode status: {status}",
-                        "data": response_data,
-                    }
-                )
-
-    async def fetch_results(self):
-        decoding_tasks = await self.mapper.fetch_decoding_tasks()
-        if not decoding_tasks:
-            await self.log_service.log(
-                contents={"task": "fetch_results", "message": "No more tasks to fetch"}
-            )
-            return
-
-        for task in decoding_tasks:
-            await self.fetch_single_task(task)
-
-    async def deal(self):
-        await self.fetch_results()
-
-
-__all__ = ["FetchDecodeResults"]

+ 2 - 1
app/domains/llm_tasks/decode_article/_const.py

@@ -1,6 +1,7 @@
 class DecodeArticleConst:
-    CONFIG_ID = 61  # 长文头条-文章解构
+    CONFIG_ID = 66  # 长文头条-文章解构-正式
     TASK_BATCH = 200  # 每批处理数
+    # TASK_BATCH = 20  # 每批处理数
     SUBMIT_BATCH = 50  # 提交 API 每批帖子上限
 
     class TaskStatus:

+ 29 - 19
app/domains/llm_tasks/decode_article/_mapper.py

@@ -101,17 +101,19 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
     async def fetch_existing_channel_content_ids(
         self, channel_content_ids: List[str]
     ) -> set:
-        """批量查询哪些 channel_content_id 已有任务记录"""
+        """批量查询哪些 channel_content_id 已有成功解构结果,用于去重跳过"""
         if not channel_content_ids:
             return set()
         placeholders = ",".join(["%s"] * len(channel_content_ids))
         query = f"""
             SELECT channel_content_id FROM {TABLE}
-            WHERE channel_content_id IN ({placeholders}) AND config_id = %s
+            WHERE channel_content_id IN ({placeholders})
+              AND config_id = %s
+              AND status = %s
         """
         rows = await self.pool.async_fetch(
             query=query,
-            params=(*channel_content_ids, self.CONFIG_ID),
+            params=(*channel_content_ids, self.CONFIG_ID, self.TaskStatus.SUCCESS),
         )
         return {r["channel_content_id"] for r in rows}
 
@@ -188,26 +190,34 @@ class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
 
 
 class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
+    TABLE_INNER = "long_articles_decode_articles"
+
     def __init__(self, pool: DatabaseManager):
         super().__init__(pool)
 
     async def fetch_inner_articles(self) -> List[Dict]:
-        query = """
-            SELECT  title
-                  ,SUM(fans) AS total_fans
-                  ,SUM(view_count) AS total_view
-                  ,SUM(view_count) / SUM(fans) AS avg_read_rate
-                  ,SUM(first_level) AS total_first_level
-                  ,MAX(source_id) as source_id
-                  ,MAX(wx_sn) as wx_sn
-            FROM    datastat_sort_strategy
-            WHERE  date_str >= '20250101'
-            GROUP BY title
-            HAVING total_fans > 100000
-            AND     avg_read_rate > 0.002
-            AND     total_first_level > 0
+        query = f"""
+            SELECT id, title, source_id, wx_sn, coverimgurl, article_text, summary, card_title
+            FROM {self.TABLE_INNER}
+            WHERE status = %s
+            ORDER BY max_read_rate DESC
+            LIMIT %s
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
+        )
+
+    async def update_inner_article_status(
+        self, id_: int, ori_status: int, new_status: int
+    ) -> int:
+        query = f"""
+            UPDATE {self.TABLE_INNER}
+            SET status = %s
+            WHERE id = %s AND status = %s
         """
-        return await self.pool.async_fetch(query=query)
+        return await self.pool.async_save(
+            query=query, params=(new_status, id_, ori_status)
+        )
 
     async def fetch_inner_articles_produce_detail(
         self, source_id
@@ -216,7 +226,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             SELECT produce_module_type, output
             FROM produce_plan_module_output
             WHERE plan_exe_id = %s
-            AND produce_module_type in (1,2,3,4,18)
+            AND produce_module_type IN (1, 2, 4)
         """
         return await self.pool.async_fetch(
             query=query, db_name="aigc", params=(source_id,)

+ 20 - 17
app/domains/llm_tasks/decode_article/_utils.py

@@ -10,14 +10,15 @@ class AigcDecodeUtils(DecodeArticleConst):
     decode_server = AigcDecodeServer()
 
     async def submit_decode_batch(
-        self, posts: List[Dict]
+        self, posts: List[Dict], *, config_id: int = None, skip_completed: bool = False
     ) -> Dict[str, Dict]:
         """分批提交解构任务,返回 {channel_content_id: {status, errorMessage}}"""
+        cfg_id = config_id or self.CONFIG_ID
         result = {}
         for i in range(0, len(posts), self.SUBMIT_BATCH):
             batch = posts[i : i + self.SUBMIT_BATCH]
             response = await self.decode_server.submit_decode(
-                config_id=self.CONFIG_ID, posts=batch
+                config_id=cfg_id, posts=batch, skip_completed=skip_completed
             )
             if response.get("code") == 0:
                 for item in response.get("data", []):
@@ -34,16 +35,17 @@ class AigcDecodeUtils(DecodeArticleConst):
         return result
 
     async def query_decode_results_batch(
-        self, channel_content_ids: List[str]
+        self, channel_content_ids: List[str], *, config_id: int = None
     ) -> Dict[str, Dict]:
         """分批查询解构结果,返回 {channel_content_id: {status, dataContent, html, errorMessage}}
         当 API 调用失败时,对应条目 status 为 API_ERROR,调用方应保持 INIT 等待重试。
         """
+        cfg_id = config_id or self.CONFIG_ID
         result = {}
         for i in range(0, len(channel_content_ids), self.SUBMIT_BATCH):
             batch = channel_content_ids[i : i + self.SUBMIT_BATCH]
             response = await self.decode_server.query_decode_results(
-                config_id=self.CONFIG_ID, channel_content_ids=batch
+                config_id=cfg_id, channel_content_ids=batch
             )
             if response.get("code") == 0:
                 for item in response.get("data", []):
@@ -136,26 +138,27 @@ class InnerArticlesDecodeUtils(AigcDecodeUtils):
         for article in articles:
             wx_sn = article["wx_sn"]
             produce_info = produce_info_map.get(wx_sn, [])
-            images = [
-                i["output"]
-                for i in produce_info
-                if i["produce_module_type"]
-                in (self.ProduceModuleType.COVER, self.ProduceModuleType.IMAGE)
-            ]
-            text_parts = [
-                i["output"]
-                for i in produce_info
-                if i["produce_module_type"] == self.ProduceModuleType.CONTENT
-            ]
+
+            # 收集图片:封面(coverimgurl) + produce COVER + produce IMAGE
+            images = []
+            if article.get("coverimgurl"):
+                images.append(article["coverimgurl"])
+            for pi in produce_info:
+                if pi["produce_module_type"] == self.ProduceModuleType.COVER:
+                    images.append(pi["output"])
+            for pi in produce_info:
+                if pi["produce_module_type"] == self.ProduceModuleType.IMAGE:
+                    images.append(pi["output"])
+
             posts.append(
                 {
-                    "channelContentId": wx_sn,
                     "title": article.get("title", ""),
-                    "bodyText": "\n".join(text_parts),
+                    "bodyText": article.get("article_text", ""),
                     "images": images,
                     "video": None,
                     "contentModal": self.ContentModal.LONG_ARTICLE,
                     "channel": self.Channel.WECHAT,
+                    "channelContentId": wx_sn,
                 }
             )
         return posts

+ 158 - 89
app/domains/llm_tasks/decode_article/create_decode_tasks.py

@@ -58,7 +58,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
 
             if not result:
                 await self.mapper.update_article_decode_status(
-                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
                 )
                 await self.log_service.log(
                     contents={
@@ -66,7 +66,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                         "wx_sn": wx_sn,
                         "task": "create_decode_task_v2",
                         "status": "fail",
-                        "message": "no response for channel_content_id",
+                        "message": "no response for channel_content_id, rolled back to INIT",
                     }
                 )
                 continue
@@ -74,7 +74,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
             status = result.get("status")
             if status == self.SubmitStatus.FAILED:
                 await self.mapper.update_article_decode_status(
-                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
                 )
                 await self.log_service.log(
                     contents={
@@ -174,7 +174,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                 )
             else:
                 await self.mapper.update_article_decode_status(
-                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
+                    article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
                 )
                 await self.log_service.log(
                     contents={
@@ -182,7 +182,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
                         "wx_sn": wx_sn,
                         "task": "create_decode_task_v2",
                         "status": "fail",
-                        "message": f"unexpected submit status: {status}",
+                        "message": f"unexpected submit status: {status}, rolled back to INIT",
                         "data": result,
                     }
                 )
@@ -208,42 +208,148 @@ class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
 
 
 class CreateInnerArticlesDecodeTask(DecodeArticleConst):
+    _TEST_MODE = False
+
     def __init__(self, pool: DatabaseManager, log_service: LogService):
         self.pool = pool
         self.log_service = log_service
         self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
         self.tool = InnerArticlesDecodeUtils()
 
-    async def deal(self):
+    async def _acquire_articles(self) -> List[Dict]:
+        """获取待解构文章,并加锁(status INIT → PROCESSING)"""
         article_list = await self.mapper.fetch_inner_articles()
-        if not article_list:
+        if self._TEST_MODE:
+            return article_list
+
+        locked = []
+        for article in article_list:
+            article_id = article["id"]
+            acquired = await self.mapper.update_inner_article_status(
+                article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
+            )
+            if acquired:
+                locked.append(article)
+            else:
+                await self.log_service.log(
+                    contents={
+                        "article_id": article_id,
+                        "task": "create_inner_decode_task",
+                        "status": "skip",
+                        "message": "acquire lock failed",
+                    }
+                )
+        return locked
+
+    async def _handle_result(
+        self,
+        article: Dict,
+        channel_content_id: str,
+        result: Dict,
+        posts_by_cid: Dict,
+        config_id: int,
+    ):
+        wx_sn = article["wx_sn"]
+
+        if not result:
             await self.log_service.log(
                 contents={
-                    "task": "create_inner_decode_task_v2",
-                    "message": "No more articles to decode",
+                    "wx_sn": wx_sn,
+                    "task": "create_inner_decode_task",
+                    "status": "fail",
+                    "message": "no response for channel_content_id",
                 }
             )
             return
 
-        # 过滤已有任务记录的文章
-        all_wx_sns = [a["wx_sn"] for a in article_list]
-        existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
-        new_articles = [a for a in article_list if a["wx_sn"] not in existing]
-        skipped = len(article_list) - len(new_articles)
-        if skipped > 0:
+        status = result.get("status")
+        if status == self.SubmitStatus.FAILED:
             await self.log_service.log(
                 contents={
-                    "task": "create_inner_decode_task_v2",
-                    "message": f"Skipped {skipped} already-submitted articles",
+                    "wx_sn": wx_sn,
+                    "task": "create_inner_decode_task",
+                    "status": "fail",
+                    "data": result,
                 }
             )
-        if not new_articles:
+        elif status == self.SubmitStatus.PENDING:
+            await self.mapper.insert_decode_task(
+                channel_content_id=channel_content_id,
+                content_id=str(article.get("source_id", "")),
+                source=self.SourceType.INNER,
+                payload=json.dumps(
+                    posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
+                ),
+                remark="内部文章解构任务已提交",
+            )
+        elif status == self.SubmitStatus.SUCCESS:
+            query_results = await self.tool.query_decode_results_batch(
+                [channel_content_id], config_id=config_id
+            )
+            result_data = query_results.get(channel_content_id)
+            data_content = result_data.get("dataContent") if result_data else None
+            if data_content:
+                await self.mapper.insert_decode_task(
+                    channel_content_id=channel_content_id,
+                    content_id=str(article.get("source_id", "")),
+                    source=self.SourceType.INNER,
+                    payload=json.dumps(
+                        posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
+                    ),
+                    remark="内部文章解构结果已获取",
+                )
+                await self.mapper.set_decode_result(
+                    channel_content_id=channel_content_id,
+                    result=json.dumps(
+                        {"dataContent": data_content}, ensure_ascii=False
+                    ),
+                )
+            else:
+                await self.mapper.insert_decode_task(
+                    channel_content_id=channel_content_id,
+                    content_id=str(article.get("source_id", "")),
+                    source=self.SourceType.INNER,
+                    payload=json.dumps(result, ensure_ascii=False),
+                    remark="提交返回SUCCESS,查询未果,等待轮询",
+                )
+        else:
             await self.log_service.log(
                 contents={
-                    "task": "create_inner_decode_task_v2",
-                    "message": "All articles already submitted",
+                    "wx_sn": wx_sn,
+                    "task": "create_inner_decode_task",
+                    "status": "fail",
+                    "message": f"unexpected submit status: {status}",
+                    "data": result,
                 }
             )
+
+    async def _submit_and_record(self, articles: List[Dict]):
+        if not articles:
+            return
+
+        # 过滤已有任务记录的文章(测试模式跳过)
+        if not self._TEST_MODE:
+            all_wx_sns = [a["wx_sn"] for a in articles]
+            existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
+            new_articles = [a for a in articles if a["wx_sn"] not in existing]
+            skipped = len(articles) - len(new_articles)
+            if skipped > 0:
+                await self.log_service.log(
+                    contents={
+                        "task": "create_inner_decode_task",
+                        "message": f"Skipped {skipped} already-submitted articles",
+                    }
+                )
+
+            for article in articles:
+                if article not in new_articles:
+                    await self.mapper.update_inner_article_status(
+                        article["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
+                    )
+        else:
+            new_articles = articles
+
+        if not new_articles:
             return
 
         # 批量获取 produce 信息
@@ -256,86 +362,49 @@ class CreateInnerArticlesDecodeTask(DecodeArticleConst):
             produce_info_map[article["wx_sn"]] = produce_info
 
         posts = self.tool.prepare_posts(new_articles, produce_info_map)
-        submit_results = await self.tool.submit_decode_batch(posts)
-        posts_by_wx = {p["channelContentId"]: p for p in posts}
+
+        submit_results = await self.tool.submit_decode_batch(
+            posts, config_id=self.CONFIG_ID, skip_completed=True
+        )
+        posts_by_cid = {p["channelContentId"]: p for p in posts}
 
         for article in tqdm(new_articles):
             wx_sn = article["wx_sn"]
+            article_id = article["id"]
+
             result = submit_results.get(wx_sn)
-            if not result:
-                await self.log_service.log(
-                    contents={
-                        "wx_sn": wx_sn,
-                        "task": "create_inner_decode_task_v2",
-                        "status": "fail",
-                        "message": "no response for channel_content_id",
-                    }
-                )
-                continue
+            await self._handle_result(
+                article, wx_sn, result, posts_by_cid, self.CONFIG_ID
+            )
 
-            status = result.get("status")
-            if status == self.SubmitStatus.FAILED:
-                await self.log_service.log(
-                    contents={
-                        "wx_sn": wx_sn,
-                        "task": "create_inner_decode_task_v2",
-                        "status": "fail",
-                        "data": result,
-                    }
-                )
-            elif status == self.SubmitStatus.PENDING:
-                await self.mapper.insert_decode_task(
-                    channel_content_id=wx_sn,
-                    content_id=str(article.get("source_id", "")),
-                    source=self.SourceType.INNER,
-                    payload=json.dumps(
-                        posts_by_wx.get(wx_sn, {}), ensure_ascii=False
-                    ),
-                    remark="内部文章解构任务已提交",
-                )
-            elif status == self.SubmitStatus.SUCCESS:
-                query_results = await self.tool.query_decode_results_batch([wx_sn])
-                result_data = query_results.get(wx_sn)
-                data_content = result_data.get("dataContent") if result_data else None
-                if data_content:
-                    await self.mapper.insert_decode_task(
-                        channel_content_id=wx_sn,
-                        content_id=str(article.get("source_id", "")),
-                        source=self.SourceType.INNER,
-                        payload=json.dumps(
-                            posts_by_wx.get(wx_sn, {}), ensure_ascii=False
-                        ),
-                        remark="内部文章解构结果已获取",
-                    )
-                    await self.mapper.set_decode_result(
-                        channel_content_id=wx_sn,
-                        result=json.dumps(
-                            {"dataContent": data_content}, ensure_ascii=False
-                        ),
+            if not self._TEST_MODE:
+                ok = result and result.get("status") != self.SubmitStatus.FAILED
+                if ok:
+                    await self.mapper.update_inner_article_status(
+                        article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
                     )
                 else:
-                    await self.mapper.insert_decode_task(
-                        channel_content_id=wx_sn,
-                        content_id=str(article.get("source_id", "")),
-                        source=self.SourceType.INNER,
-                        payload=json.dumps(result, ensure_ascii=False),
-                        remark="提交返回SUCCESS,查询未果,等待轮询",
+                    # 提交失败或无响应,回锁为 INIT 等待下次重试
+                    await self.mapper.update_inner_article_status(
+                        article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
                     )
-            else:
-                await self.log_service.log(
-                    contents={
-                        "wx_sn": wx_sn,
-                        "task": "create_inner_decode_task_v2",
-                        "status": "fail",
-                        "message": f"unexpected submit status: {status}",
-                        "data": result,
-                    }
-                )
 
+    async def deal(self):
+        article_list = await self._acquire_articles()
+        if not article_list:
+            await self.log_service.log(
+                contents={
+                    "task": "create_inner_decode_task",
+                    "message": "No more articles to decode",
+                }
+            )
+            return
+
+        await self._submit_and_record(article_list)
         await self.log_service.log(
             contents={
-                "task": "create_inner_decode_task_v2",
-                "message": f"Processed {len(new_articles)} articles",
+                "task": "create_inner_decode_task",
+                "message": f"Processed {len(article_list)} articles",
             }
         )
 

+ 56 - 0
app/infra/internal/aigc_decode_server.py

@@ -0,0 +1,56 @@
+from typing import Dict, List
+
+from app.infra.shared import AsyncHttpClient
+
+
+class AigcDecodeServer:
+    base_url: str = "https://aigc-api.aiddit.com"
+
+    async def submit_decode(
+        self, config_id: int, posts: List[Dict], skip_completed: bool = False
+    ) -> Dict:
+        """批量提交帖子解构
+        POST /aigc/api/task/decode
+        """
+        url = f"{self.base_url}/aigc/api/task/decode"
+        headers = {"Content-Type": "application/json"}
+        payload = {
+            "params": {
+                "configId": config_id,
+                "skipCompleted": skip_completed,
+                "posts": posts,
+            }
+        }
+        async with AsyncHttpClient() as client:
+            return await client.post(url, json=payload, headers=headers)
+
+    async def query_decode_results(
+        self, config_id: int, channel_content_ids: List[str]
+    ) -> Dict:
+        """批量查询解构结果
+        POST /aigc/api/task/decode/result
+        """
+        url = f"{self.base_url}/aigc/api/task/decode/result"
+        headers = {"Content-Type": "application/json"}
+        payload = {
+            "params": {"configId": config_id, "channelContentIds": channel_content_ids}
+        }
+        async with AsyncHttpClient() as client:
+            return await client.post(url, json=payload, headers=headers)
+
+    async def cancel_decode_tasks(
+        self, config_id: int, channel_content_ids: List[str]
+    ) -> Dict:
+        """取消待执行解构任务
+        POST /aigc/api/task/decode/cancel
+        """
+        url = f"{self.base_url}/aigc/api/task/decode/cancel"
+        headers = {"Content-Type": "application/json"}
+        payload = {
+            "params": {"configId": config_id, "channelContentIds": channel_content_ids}
+        }
+        async with AsyncHttpClient() as client:
+            return await client.post(url, json=payload, headers=headers)
+
+
+__all__ = ["AigcDecodeServer"]

+ 4 - 20
app/jobs/domains/llm_task.py

@@ -1,19 +1,7 @@
-from app.domains.llm_tasks.aigc_decode_task import CreateAdPlatformArticlesDecodeTask
-from app.domains.llm_tasks.aigc_decode_task import CreateInnerArticlesDecodeTask
-from app.domains.llm_tasks.aigc_decode_task import FetchDecodeResults
-from app.domains.llm_tasks.aigc_decode_task import ExtractDecodeTaskDetail
-from app.domains.llm_tasks.decode_article import (
-    CreateAdPlatformArticlesDecodeTask as CreateAdPlatformArticlesDecodeTaskV2,
-)
-from app.domains.llm_tasks.decode_article import (
-    CreateInnerArticlesDecodeTask as CreateInnerArticlesDecodeTaskV2,
-)
-from app.domains.llm_tasks.decode_article import (
-    FetchDecodeResults as FetchDecodeResultsV2,
-)
-from app.domains.llm_tasks.decode_article import (
-    ExtractDecodeTaskDetail as ExtractDecodeTaskDetailV2,
-)
+from app.domains.llm_tasks.decode_article import CreateAdPlatformArticlesDecodeTask
+from app.domains.llm_tasks.decode_article import CreateInnerArticlesDecodeTask
+from app.domains.llm_tasks.decode_article import FetchDecodeResults
+from app.domains.llm_tasks.decode_article import ExtractDecodeTaskDetail
 from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
@@ -25,10 +13,6 @@ __all__ = [
     "CreateInnerArticlesDecodeTask",
     "FetchDecodeResults",
     "ExtractDecodeTaskDetail",
-    "CreateAdPlatformArticlesDecodeTaskV2",
-    "CreateInnerArticlesDecodeTaskV2",
-    "FetchDecodeResultsV2",
-    "ExtractDecodeTaskDetailV2",
     "TitleRewrite",
     "ArticlePoolCategoryGeneration",
     "CandidateAccountQualityScoreRecognizer",

+ 4 - 27
app/jobs/task_handler.py

@@ -469,38 +469,15 @@ class TaskHandler:
 
     # ====================== V2 解构任务(新 AIGC API)======================
 
-    @register("create_ad_platform_accounts_decode_task_v2")
-    async def _create_decode_task_v2(self) -> int:
-        """创建解构任务(v2 - 新 AIGC API)"""
-        task = CreateAdPlatformArticlesDecodeTaskV2(
+    @register("create_inner_articles_decode_task")
+    async def _create_inner_decode_task(self) -> int:
+        """创建内部文章解构任务"""
+        task = CreateInnerArticlesDecodeTask(
             pool=self.db_client, log_service=self.log_client
         )
         await task.deal()
         return TaskStatus.SUCCESS
 
-    @register("create_inner_articles_decode_task_v2")
-    async def _create_inner_decode_task_v2(self) -> int:
-        """创建内部文章解构任务(v2 - 新 AIGC API)"""
-        task = CreateInnerArticlesDecodeTaskV2(
-            pool=self.db_client, log_service=self.log_client
-        )
-        await task.deal()
-        return TaskStatus.SUCCESS
-
-    @register("fetch_decode_result_v2")
-    async def _fetch_decode_result_v2(self) -> int:
-        """获取解构任务结果(v2 - 新 AIGC API)"""
-        task = FetchDecodeResultsV2(pool=self.db_client, log_service=self.log_client)
-        await task.deal()
-        return TaskStatus.SUCCESS
-
-    @register("extract_decode_result_v2")
-    async def _extract_decode_result_v2(self) -> int:
-        """提取解构任务结果(v2 - 新 AIGC API)"""
-        task = ExtractDecodeTaskDetailV2(pool=self.db_client, log_service=self.log_client)
-        await task.deal()
-        return TaskStatus.SUCCESS
-
     # ====================== Recommend Tasks=====================
     @register("i2i_recommend_data_sync")
     async def _i2i_recommend_data_sync_handler(self) -> int: