luojunhui пре 1 месец
родитељ
комит
ae192b4f66

+ 1 - 0
.gitignore

@@ -62,3 +62,4 @@ docs/_build/
 # PyBuilder
 target/
 
+.cursor

+ 12 - 0
app/domains/llm_tasks/aigc_decode_task/_const.py

@@ -28,11 +28,23 @@ class DecodeTaskConst:
         LONG_ARTICLE = 1
         PICTURE_TEXT = 2
         VIDEO = 3
+        TITLE_COVER = 4
 
     class SourceType:
         AD_PLATFORM = 1
         INNER = 2
 
+    class TaskType:
+        """
+        解构任务类型
+        1: 源文章和图集
+        2: 发文标题和封面
+        3: 首个小程序卡片封面标题
+        """
+        SOURCE_IMAGES_TEXT = 1
+        PUBLISH_TITLE_COVER = 2
+        MINI_TITLE_CARD = 3
+
     class ProduceModuleType:
         COVER = 1
         IMAGE = 2

+ 225 - 32
app/domains/llm_tasks/aigc_decode_task/_mapper.py

@@ -6,25 +6,39 @@ from ._const import DecodeTaskConst
 
 
 class ArticlesDecodeTaskMapper(DecodeTaskConst):
+    DECODE_TASK_QUEUE = "long_articles_new_decode_tasks"
+    INNER_DECODE_CREATE_STATE = "long_articles_inner_decode_create_state"
+
     def __init__(self, pool: DatabaseManager):
         self.pool = pool
 
     # 存储解构任务
     async def record_decode_task(
-        self, task_id: str, wx_sn: str, remark: str = None
+        self, task_id: str,  content_id: str, task_type: int, payload: str, remark: str = None
     ) -> int:
-        query = """
-            INSERT INTO long_articles_decode_tasks (task_id, wx_sn, remark)
-            VALUES (%s, %s, %s)
+        query = f"""
+            INSERT INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
+            VALUES (%s, %s, %s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=(task_id, content_id, task_type, payload, remark))
+
+    async def record_decode_task_if_absent(
+        self, task_id: str, content_id: str, task_type: int, payload: str, remark: str = None
+    ) -> int:
+        query = f"""
+            INSERT IGNORE INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
+            VALUES (%s, %s, %s, %s, %s)
         """
-        return await self.pool.async_save(query=query, params=(task_id, wx_sn, remark))
+        return await self.pool.async_save(
+            query=query, params=(task_id, content_id, task_type, payload, remark)
+        )
 
     # 更新解构任务状态
     async def update_decode_task_status(
         self, task_id: str, ori_status: int, new_status: int, remark: str = None
     ) -> int:
-        query = """
-            UPDATE long_articles_decode_tasks
+        query = f"""
+            UPDATE {self.DECODE_TASK_QUEUE}
             SET status = %s, remark = %s
             WHERE task_id = %s AND status = %s;
         """
@@ -36,8 +50,8 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
     async def set_decode_result(
         self, task_id: str, result: str, remark: str = None
     ) -> int:
-        query = """
-            UPDATE long_articles_decode_tasks
+        query = f"""
+            UPDATE {self.DECODE_TASK_QUEUE}
             SET status = %s, remark = %s, result = %s
             WHERE task_id = %s AND status = %s;
         """
@@ -54,8 +68,8 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
 
     # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
     async def fetch_decoding_tasks(self) -> List[Dict]:
-        query = """
-            SELECT task_id FROM long_articles_decode_tasks WHERE status = %s LIMIT %s;
+        query = f"""
+            SELECT task_id FROM {self.DECODE_TASK_QUEUE} WHERE status = %s LIMIT %s;
         """
         return await self.pool.async_fetch(
             query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
@@ -63,8 +77,8 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
 
     # 获取待解析的任务(获取处理成功的任务)
     async def fetch_extract_tasks(self):
-        query = """
-            SELECT id, result FROM long_articles_decode_tasks
+        query = f"""
+            SELECT id, result FROM {self.DECODE_TASK_QUEUE}
             WHERE extract_status = %s AND status = %s;
         """
         return await self.pool.async_fetch(
@@ -73,8 +87,8 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
 
     # 修改解析状态(用于加锁与状态流转)
     async def update_extract_status(self, task_id, ori_status, new_status):
-        query = """
-            UPDATE long_articles_decode_tasks
+        query = f"""
+            UPDATE {self.DECODE_TASK_QUEUE}
             SET extract_status = %s WHERE extract_status = %s AND id = %s;
         """
         return await self.pool.async_save(
@@ -104,11 +118,32 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
             ),
         )
 
+    # 判断是否存在相同的任务 id
+    async def fetch_exist_source_id(self, source_id, task_type):
+        query = f"""
+            SELECT id FROM {self.DECODE_TASK_QUEUE}
+            WHERE content_id = %s AND task_type = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(source_id, task_type)
+        )
+
 
 class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
     def __init__(self, pool: DatabaseManager):
         super().__init__(pool)
 
+    async def record_decode_task(
+        self, task_id: str, wx_sn: str, remark: str = None
+    ) -> int:
+        return await super().record_decode_task(
+            task_id=task_id,
+            content_id=wx_sn,
+            task_type=self.TaskType.SOURCE_IMAGES_TEXT,
+            payload="{}",
+            remark=remark,
+        )
+
     # 修改文章解构状态
     async def update_article_decode_status(
         self, id_: int, ori_status: int, new_status: int
@@ -141,24 +176,15 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
         super().__init__(pool)
 
     # 获取内部文章
-    async def fetch_inner_articles(self):
+    async def fetch_inner_articles(self, date_string="20260401"):
         query = """
-            SELECT  title
-                  ,SUM(fans) AS total_fans
-                  ,SUM(view_count) AS total_view
-                  ,SUM(view_count) / SUM(fans) AS avg_read_rate
-                  ,SUM(first_level) AS total_first_level
-                  ,MAX(source_id) as source_id
-                  ,MAX(wx_sn) as wx_sn
-            FROM    datastat_sort_strategy
-            WHERE  date_str >= '20250101'
-            GROUP BY title
-            HAVING total_fans > 100000
-            AND     avg_read_rate > 0.008
-            AND     total_first_level > 0;
-
-        """
-        return await self.pool.async_fetch(query=query)
+            SELECT title, source_id, wx_sn, cover_img_url FROM long_articles_good_read_article WHERE dt = %s
+            AND source_id IN ('20260222161421405412536', '20241011051312526697231', '20241125113847098601958', '20241011042712648349812')
+            ORDER by max_read_rate DESC LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(date_string, 20)
+        )
 
     # 获取内部文章生成信息
     async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
@@ -171,6 +197,173 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             query=query, db_name="aigc", params=(source_id,)
         )
 
+    # 获取文章源信息
+    async def fetch_article_crawler_source_info(self, source_id: str):
+        query = """
+            SELECT 
+                t2.channel_content_id, t3.body_text
+            FROM produce_plan_exe_record t1
+                LEFT JOIN produce_plan_exe_refer_content t2 ON t2.plan_exe_id = t1.plan_exe_id
+                LEFT JOIN crawler_content_blob t3 ON t3.channel_content_id = t2.channel_content_id
+            WHERE
+                t1.plan_exe_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="aigc", params=(source_id,)
+        )
+
+    async def init_create_state(self, source_id: str, task_type: int, now_ts: int):
+        query = f"""
+            INSERT IGNORE INTO {self.INNER_DECODE_CREATE_STATE}
+                (source_id, task_type, status, retry_count, locked_at, created_at, updated_at)
+            VALUES (%s, %s, %s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                source_id,
+                task_type,
+                self.TaskStatus.INIT,
+                0,
+                0,
+                now_ts,
+                now_ts,
+            ),
+        )
+
+    async def fetch_create_state(self, source_id: str, task_type: int):
+        query = f"""
+            SELECT source_id, task_type, status, retry_count, locked_at, remote_task_id, last_error
+            FROM {self.INNER_DECODE_CREATE_STATE}
+            WHERE source_id = %s AND task_type = %s
+            LIMIT 1;
+        """
+        rows = await self.pool.async_fetch(query=query, params=(source_id, task_type))
+        if not rows:
+            return None
+        return rows[0]
+
+    async def acquire_create_lock(
+        self,
+        source_id: str,
+        task_type: int,
+        now_ts: int,
+        max_retry_times: int,
+        lock_expire_before: int,
+    ):
+        query = f"""
+            UPDATE {self.INNER_DECODE_CREATE_STATE}
+            SET status = %s, locked_at = %s, updated_at = %s, last_error = NULL
+            WHERE source_id = %s
+              AND task_type = %s
+              AND (
+                    status = %s
+                 OR (status = %s AND retry_count < %s)
+                 OR (status = %s AND locked_at > 0 AND locked_at < %s)
+              );
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.TaskStatus.PROCESSING,
+                now_ts,
+                now_ts,
+                source_id,
+                task_type,
+                self.TaskStatus.INIT,
+                self.TaskStatus.FAILED,
+                max_retry_times,
+                self.TaskStatus.PROCESSING,
+                lock_expire_before,
+            ),
+        )
+
+    async def mark_create_success(
+        self,
+        source_id: str,
+        task_type: int,
+        remote_task_id: str,
+        now_ts: int,
+        remark: str = None,
+    ):
+        query = f"""
+            UPDATE {self.INNER_DECODE_CREATE_STATE}
+            SET status = %s,
+                remote_task_id = %s,
+                last_error = %s,
+                locked_at = 0,
+                updated_at = %s
+            WHERE source_id = %s AND task_type = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.TaskStatus.SUCCESS,
+                remote_task_id,
+                remark,
+                now_ts,
+                source_id,
+                task_type,
+                self.TaskStatus.PROCESSING,
+            ),
+        )
+
+    async def mark_create_retry(
+        self,
+        source_id: str,
+        task_type: int,
+        now_ts: int,
+        error_message: str,
+    ):
+        query = f"""
+            UPDATE {self.INNER_DECODE_CREATE_STATE}
+            SET status = %s,
+                retry_count = retry_count + 1,
+                last_error = %s,
+                locked_at = 0,
+                updated_at = %s
+            WHERE source_id = %s AND task_type = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.TaskStatus.INIT,
+                error_message,
+                now_ts,
+                source_id,
+                task_type,
+                self.TaskStatus.PROCESSING,
+            ),
+        )
+
+    async def mark_create_failed(
+        self,
+        source_id: str,
+        task_type: int,
+        now_ts: int,
+        error_message: str,
+    ):
+        query = f"""
+            UPDATE {self.INNER_DECODE_CREATE_STATE}
+            SET status = %s,
+                retry_count = retry_count + 1,
+                last_error = %s,
+                locked_at = 0,
+                updated_at = %s
+            WHERE source_id = %s AND task_type = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                self.TaskStatus.FAILED,
+                error_message,
+                now_ts,
+                source_id,
+                task_type,
+                self.TaskStatus.PROCESSING,
+            ),
+        )
+
 
 __all__ = [
     "ArticlesDecodeTaskMapper",

+ 72 - 16
app/domains/llm_tasks/aigc_decode_task/_utils.py

@@ -1,7 +1,13 @@
 import json
+import re
+from typing import Any
 from typing import Dict, List
 
+import aiohttp
+from tenacity import AsyncRetrying
+
 from app.infra.internal import DecodeServer
+from app.infra.shared.tools import request_retry
 
 from ._const import DecodeTaskConst
 
@@ -94,26 +100,76 @@ class AdPlatformArticlesDecodeUtils(DecodeTaskUtil):
 
 
 class InnerArticlesDecodeUtils(DecodeTaskUtil):
-    async def create_decode_task(self, article: Dict, article_produce_info: List[Dict]):
-        images = [
-            i["output"]
-            for i in article_produce_info
-            if i["produce_module_type"]
-            in (self.ProduceModuleType.COVER, self.ProduceModuleType.IMAGE)
-        ]
-        article["article_images"] = images
-        text = [
-            i["output"]
-            for i in article_produce_info
-            if i["produce_module_type"] == self.ProduceModuleType.CONTENT
-        ]
-        article["article_text"] = "\n".join(text)
-        request_body = self.prepare_extract_body(article)
+    RETRYABLE_EXCEPTIONS = (aiohttp.ClientError, TimeoutError)
+
+    @staticmethod
+    def extract_body_text_and_images(raw_text):
+        """
+        从文本中提取图片和正文
+        """
+        if not raw_text:
+            return "", []
+
+        image_pattern = re.compile(r"^\[image:\s*(https?://[^\]]+)\s*\]$")
+        body_lines = []
+        images = []
+
+        for line in raw_text.splitlines():
+            stripped_line = line.strip()
+            if not stripped_line:
+                if body_lines and body_lines[-1] != "":
+                    body_lines.append("")
+                continue
+
+            match = image_pattern.match(stripped_line)
+            if match:
+                images.append(match.group(1).strip())
+                continue
+
+            body_lines.append(stripped_line)
+
+        while body_lines and body_lines[-1] == "":
+            body_lines.pop()
+
+        body_text = "\n".join(body_lines)
+        return body_text, images
+
+
+    async def create_decode_task(self, payload):
+        request_body = {
+            "scene": self.BusinessScene.POINT_PICK,
+            "content_type": payload["content_type"],
+            "content": {
+                "channel_content_id": payload.get("channel_content_id", ""),
+                "video_url": "",
+                "images": payload.get("images", []),
+                "body_text": payload.get("body_text", ""),
+                "title": payload.get("title", ""),
+                "channel_account_id": payload.get("gh_id", ""),
+                "channel_account_name": payload.get("account_name", ""),
+            },
+        }
         return await self.decode_server.create_decode_task(request_body)
 
+    async def create_decode_task_with_retry(
+        self,
+        payload: Dict[str, Any],
+        retry_times: int,
+        min_retry_delay: int = 1,
+        max_retry_delay: int = 4,
+    ):
+        retry_kwargs = request_retry(
+            retry_times=retry_times,
+            min_retry_delay=min_retry_delay,
+            max_retry_delay=max_retry_delay,
+        )
+        async for attempt in AsyncRetrying(**retry_kwargs):
+            with attempt:
+                return await self.create_decode_task(payload)
+
 
 __all__ = [
     "AdPlatformArticlesDecodeUtils",
     "InnerArticlesDecodeUtils",
     "DecodeTaskUtil",
-]
+]

+ 265 - 29
app/domains/llm_tasks/aigc_decode_task/create_decode_tasks.py

@@ -1,8 +1,14 @@
+import asyncio
+import time
 from typing import Dict
+
+import json
+import aiohttp
 from tqdm import tqdm
 
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
+from app.infra.shared.async_tasks import run_tasks_with_asyncio_task_group
 
 from ._const import DecodeTaskConst
 from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
@@ -122,53 +128,283 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
 
 
 class CreateInnerArticlesDecodeTask(DecodeTaskConst):
+    CREATE_TASK_NAME = "create_inner_articles_decode_task"
+    MAX_CREATE_RETRY_TIMES = 3
+    LOCK_TIMEOUT_SECONDS = 30 * 60
+    CREATE_MAX_CONCURRENCY = 5
+
     def __init__(self, pool: DatabaseManager, log_service: LogService):
         self.pool = pool
         self.log_service = log_service
         self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
         self.tool = InnerArticlesDecodeUtils()
 
-    async def create_single_decode_task(self, article: Dict):
-        # Acquire Lock
-        source_id = article["source_id"]
-        article_produce_info = await self.mapper.fetch_inner_articles_produce_detail(
-            source_id
+    async def _log_create_event(self, **contents):
+        await self.log_service.log(
+            contents={"task": self.CREATE_TASK_NAME, **contents}
         )
 
-        # 与解构系统交互,创建解构任务
-        response = await self.tool.create_decode_task(article, article_produce_info)
-        response_code = response.get("code")
-        if response_code != self.RequestDecode.SUCCESS:
-            return
+    @staticmethod
+    def _trim_error_message(message: str, limit: int = 500):
+        if not message:
+            return ""
+        return message[:limit]
 
-        task_id = response.get("data", {}).get("task_id") or response.get(
-            "data", {}
-        ).get("taskId")
-        if not task_id:
+    async def _mark_retry_or_failed(
+        self,
+        source_id: str,
+        task_type: int,
+        error_message: str,
+        retryable: bool,
+        state: Dict | None,
+    ):
+        now_ts = int(time.time())
+        retry_count = (state or {}).get("retry_count", 0)
+        should_retry = retryable and retry_count < self.MAX_CREATE_RETRY_TIMES
+        error_message = self._trim_error_message(error_message)
+
+        if should_retry:
+            await self.mapper.mark_create_retry(
+                source_id=source_id,
+                task_type=task_type,
+                now_ts=now_ts,
+                error_message=error_message,
+            )
+            await self._log_create_event(
+                source_id=source_id,
+                task_type=task_type,
+                status="retry",
+                retry_count=retry_count + 1,
+                message=error_message,
+            )
             return
 
-        wx_sn = article["wx_sn"]
-        remark = f"task_id: {task_id}-创建解构任务"
-        record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
-        if not record_row:
+        await self.mapper.mark_create_failed(
+            source_id=source_id,
+            task_type=task_type,
+            now_ts=now_ts,
+            error_message=error_message,
+        )
+        await self._log_create_event(
+            source_id=source_id,
+            task_type=task_type,
+            status="failed",
+            retry_count=retry_count + 1,
+            message=error_message,
+        )
+
+    async def _build_payload(self, article: Dict, task_type: int):
+        source_id = article["source_id"]
+        match task_type:
+            case self.TaskType.PUBLISH_TITLE_COVER:
+                return {
+                    "source_id": source_id,
+                    "title": article["title"],
+                    "cover_img": article["cover_img_url"],
+                    "channel_content_id": article.get("wx_sn", source_id),
+                    "content_type": self.ContentType.TITLE_COVER
+                }
+
+            case self.TaskType.SOURCE_IMAGES_TEXT:
+                crawl_source_info = await self.mapper.fetch_article_crawler_source_info(source_id)
+                if not crawl_source_info:
+                    raise ValueError("未找到文章抓取源信息")
+
+                crawl_info = crawl_source_info[0]
+                channel_content_id = crawl_info["channel_content_id"]
+                raw_body_text = crawl_info["body_text"]
+                body_text, images = self.tool.extract_body_text_and_images(raw_body_text)
+                if not body_text and not images:
+                    raise ValueError("文章正文和图片均为空,无法创建解构任务")
+
+                return {
+                    "source_id": source_id,
+                    "images": images,
+                    "body_text": body_text,
+                    "channel_content_id": channel_content_id or source_id,
+                    "content_type": self.ContentType.LONG_ARTICLE
+                }
+
+            case self.TaskType.MINI_TITLE_CARD:
+                raise NotImplementedError("MINI_TITLE_CARD 数据未完善")
+
+            case _:
+                raise ValueError(f"unsupported task type: {task_type}")
+
+    async def create_single_decode_task(self, task: Dict):
+        article = task["article"]
+        task_type = task["task_type"]
+        source_id = article["source_id"]
+        now_ts = int(time.time())
+        lock_expire_before = now_ts - self.LOCK_TIMEOUT_SECONDS
+
+        await self.mapper.init_create_state(source_id, task_type, now_ts)
+        acquire_lock = await self.mapper.acquire_create_lock(
+            source_id=source_id,
+            task_type=task_type,
+            now_ts=now_ts,
+            max_retry_times=self.MAX_CREATE_RETRY_TIMES,
+            lock_expire_before=lock_expire_before,
+        )
+        if not acquire_lock:
+            await self._log_create_event(
+                source_id=source_id,
+                task_type=task_type,
+                status="skip",
+                message="acquire create lock failed",
+            )
             return
 
-    async def create_tasks(self):
-        article_list = await self.mapper.fetch_inner_articles()
+        state = await self.mapper.fetch_create_state(source_id, task_type)
+
+        try:
+            # exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
+            # if exist_task:
+            #     await self.mapper.mark_create_success(
+            #         source_id=source_id,
+            #         task_type=task_type,
+            #         remote_task_id="existing_task",
+            #         now_ts=int(time.time()),
+            #         remark="任务已存在,跳过重复创建",
+            #     )
+            #     await self._log_create_event(
+            #         source_id=source_id,
+            #         task_type=task_type,
+            #         status="skip",
+            #         message="decode task already exists",
+            #     )
+            #     return
+
+            payload = await self._build_payload(article, task_type)
+
+            response = await self.tool.create_decode_task_with_retry(
+                payload=payload,
+                retry_times=self.MAX_CREATE_RETRY_TIMES,
+            )
+            response_code = response.get("code")
+            if response_code != self.RequestDecode.SUCCESS:
+                await self._mark_retry_or_failed(
+                    source_id=source_id,
+                    task_type=task_type,
+                    error_message=f"解构任务创建失败: {json.dumps(response, ensure_ascii=False)}",
+                    retryable=False,
+                    state=state,
+                )
+                return
+
+            task_id = response.get("data", {}).get("task_id") or response.get(
+                "data", {}
+            ).get("taskId")
+            if not task_id:
+                await self._mark_retry_or_failed(
+                    source_id=source_id,
+                    task_type=task_type,
+                    error_message=f"解构任务返回缺少 task_id: {json.dumps(response, ensure_ascii=False)}",
+                    retryable=False,
+                    state=state,
+                )
+                return
+
+            remark = f"task_id: {task_id}-创建解构任务"
+            record_row = await self.mapper.record_decode_task_if_absent(
+                task_id=task_id,
+                content_id=source_id,
+                task_type=task_type,
+                payload=json.dumps(payload, ensure_ascii=False),
+                remark=remark,
+            )
+            if record_row not in (0, 1):
+                await self._mark_retry_or_failed(
+                    source_id=source_id,
+                    task_type=task_type,
+                    error_message="创建 decode 记录失败",
+                    retryable=True,
+                    state=state,
+                )
+                return
+
+            await self.mapper.mark_create_success(
+                source_id=source_id,
+                task_type=task_type,
+                remote_task_id=task_id,
+                now_ts=int(time.time()),
+                remark=remark,
+            )
+            await self._log_create_event(
+                source_id=source_id,
+                task_type=task_type,
+                status="success",
+                retry_count=(state or {}).get("retry_count", 0),
+                data=response,
+            )
+        except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc:
+            await self._mark_retry_or_failed(
+                source_id=source_id,
+                task_type=task_type,
+                error_message=f"解构服务调用异常: {exc}",
+                retryable=True,
+                state=state,
+            )
+        except (ValueError, NotImplementedError) as exc:
+            await self._mark_retry_or_failed(
+                source_id=source_id,
+                task_type=task_type,
+                error_message=str(exc),
+                retryable=False,
+                state=state,
+            )
+        except Exception as exc:
+            await self._mark_retry_or_failed(
+                source_id=source_id,
+                task_type=task_type,
+                error_message=f"创建解构任务异常: {exc}",
+                retryable=True,
+                state=state,
+            )
+
+    async def create_tasks(self, date_string: str = None, max_concurrency: int = None):
+        article_list = await self.mapper.fetch_inner_articles(date_string or "20260401")
         if not article_list:
-            await self.log_service.log(
-                contents={
-                    "task": "create_tasks",
-                    "message": "No more articles to decode",
-                }
+            await self._log_create_event(
+                status="empty",
+                message="No more articles to decode",
             )
             return
 
-        for article in tqdm(article_list, desc="Creating decode tasks"):
-            await self.create_single_decode_task(article)
+        decode_types = [
+            self.TaskType.SOURCE_IMAGES_TEXT,
+            # self.TaskType.PUBLISH_TITLE_COVER
+        ]
+        task_list = [
+            {"article": article, "task_type": task_type}
+            for article in article_list
+            for task_type in decode_types
+        ]
 
-    async def deal(self):
-        await self.create_tasks()
+        result = await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.create_single_decode_task,
+            description="Creating inner decode tasks",
+            unit="task",
+            max_concurrency=max_concurrency or self.CREATE_MAX_CONCURRENCY,
+            fail_fast=False,
+        )
+        if result["errors"]:
+            await self._log_create_event(
+                status="partial_error",
+                message="some inner decode tasks raised uncaught errors",
+                data={
+                    "total_task": result["total_task"],
+                    "processed_task": result["processed_task"],
+                    "error_count": len(result["errors"]),
+                },
+            )
+
+    async def deal(self, date_string: str = None, max_concurrency: int = None):
+        await self.create_tasks(
+            date_string=date_string,
+            max_concurrency=max_concurrency,
+        )
 
 
 __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]

+ 5 - 0
app/jobs/task_config.py

@@ -134,6 +134,11 @@ TASK_CONFIGS = {
         max_concurrent=3,
         retry_times=2,
     ),
+    "create_inner_articles_decode_task": TaskConfig(
+        timeout=1800,
+        max_concurrent=2,
+        retry_times=1,
+    ),
     # 统计分析类任务
     "update_account_read_rate_avg": TaskConfig(
         timeout=1800,

+ 12 - 0
app/jobs/task_handler.py

@@ -484,6 +484,18 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+    @register("create_inner_articles_decode_task")
+    async def _create_inner_articles_decode_task(self) -> int:
+        """创建内部文章解构任务"""
+        task = CreateInnerArticlesDecodeTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(
+            date_string=self.data.get("date_string"),
+            max_concurrency=self.data.get("max_concurrency"),
+        )
+        return TaskStatus.SUCCESS
+
     @register("fetch_decode_result")
     async def _fetch_decode_result(self) -> int:
         """获取解构任务结果"""