Explorar el Código

Inner Decode Tasks

luojunhui hace 1 mes
padre
commit
924818119b

+ 55 - 0
app/domains/llm_tasks/aigc_decode_task/_const.py

@@ -1,6 +1,60 @@
 class DecodeTaskConst:
     TASK_BATCH = 100
 
+    SECONDS_PER_MINUTE = 60
+    INNER_DECODE_LOCK_TIMEOUT_MINUTES = 30
+    INNER_DECODE_LOCK_TIMEOUT_SECONDS = (
+        SECONDS_PER_MINUTE * INNER_DECODE_LOCK_TIMEOUT_MINUTES
+    )
+
+    class InnerDecodeCreate:
+        """内部文章解构任务创建:调度名、重试、并发、日志等"""
+
+        SCHEDULER_TASK_NAME = "create_inner_articles_decode_task"
+        MAX_RETRY_TIMES = 3
+        DEFAULT_MAX_CONCURRENCY = 5
+        DEFAULT_GOOD_READ_DATE = "20260401"
+        ERROR_MESSAGE_MAX_CHARS = 500
+        DUPLICATE_SKIP_REMOTE_TASK_ID = "existing_task"
+
+        # MySQL INSERT IGNORE:0 表示未插入(重复等),1 表示插入 1 行
+        INSERT_IGNORE_AFFECTED_NOOP = 0
+        INSERT_IGNORE_AFFECTED_INSERTED = 1
+        ASYNC_BATCH_DESCRIPTION = "Creating inner decode tasks"
+        ASYNC_BATCH_UNIT = "task"
+        ASYNC_BATCH_FAIL_FAST = False
+
+    class InnerCreateState:
+        """内部创建控制表字段初值与查询限制"""
+
+        INITIAL_RETRY_COUNT = 0
+        INITIAL_LOCKED_AT = 0
+        LOCKED_AT_CLEARED = 0
+        FETCH_STATE_ROW_LIMIT = 1
+
+    class AdPlatformDecodeTask:
+        """广告平台解构入队:占位 payload"""
+
+        EMPTY_PAYLOAD_JSON = "{}"
+
+    class AdPlatformDecodeBatch:
+        """广告平台批量创建解构任务:进度条文案"""
+
+        TQDM_DESCRIPTION = "Creating decode tasks"
+
+    class HttpDecodeCreateRetry:
+        """解构创建 HTTP 调用 tenacity 退避参数(秒)"""
+
+        MIN_DELAY_SECONDS = 1
+        MAX_DELAY_SECONDS = 4
+
+    class LogTaskKey:
+        """广告平台创建解构相关日志 task 字段"""
+
+        CREATE_SINGLE = "create_decode_task"
+        CREATE_BATCH = "create_tasks"
+        RECORD_QUEUE = "record_decode_task"
+
     class TaskStatus:
         # 任务状态
         INIT = 0
@@ -41,6 +95,7 @@ class DecodeTaskConst:
         2: 发文标题和封面
         3: 首个小程序卡片封面标题
         """
+
         SOURCE_IMAGES_TEXT = 1
         PUBLISH_TITLE_COVER = 2
         MINI_TITLE_CARD = 3

+ 49 - 25
app/domains/llm_tasks/aigc_decode_task/_mapper.py

@@ -6,7 +6,7 @@ from ._const import DecodeTaskConst
 
 
 class ArticlesDecodeTaskMapper(DecodeTaskConst):
-    DECODE_TASK_QUEUE = "long_articles_new_decode_tasks"
+    DECODE_TASK_QUEUE = "long_articles_decode_task_detail"
     INNER_DECODE_CREATE_STATE = "long_articles_inner_decode_create_state"
 
     def __init__(self, pool: DatabaseManager):
@@ -14,16 +14,28 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
 
     # 存储解构任务
     async def record_decode_task(
-        self, task_id: str,  content_id: str, task_type: int, payload: str, remark: str = None
+        self,
+        task_id: str,
+        content_id: str,
+        task_type: int,
+        payload: str,
+        remark: str = None,
     ) -> int:
         query = f"""
             INSERT INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
             VALUES (%s, %s, %s, %s, %s)
         """
-        return await self.pool.async_save(query=query, params=(task_id, content_id, task_type, payload, remark))
+        return await self.pool.async_save(
+            query=query, params=(task_id, content_id, task_type, payload, remark)
+        )
 
     async def record_decode_task_if_absent(
-        self, task_id: str, content_id: str, task_type: int, payload: str, remark: str = None
+        self,
+        task_id: str,
+        content_id: str,
+        task_type: int,
+        payload: str,
+        remark: str = None,
     ) -> int:
         query = f"""
             INSERT IGNORE INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
@@ -124,9 +136,7 @@ class ArticlesDecodeTaskMapper(DecodeTaskConst):
             SELECT id FROM {self.DECODE_TASK_QUEUE}
             WHERE content_id = %s AND task_type = %s;
         """
-        return await self.pool.async_fetch(
-            query=query, params=(source_id, task_type)
-        )
+        return await self.pool.async_fetch(query=query, params=(source_id, task_type))
 
 
 class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
@@ -140,7 +150,7 @@ class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             task_id=task_id,
             content_id=wx_sn,
             task_type=self.TaskType.SOURCE_IMAGES_TEXT,
-            payload="{}",
+            payload=self.AdPlatformDecodeTask.EMPTY_PAYLOAD_JSON,
             remark=remark,
         )
 
@@ -176,25 +186,32 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
         super().__init__(pool)
 
     # 获取内部文章
-    async def fetch_inner_articles(self, date_string="20260401"):
+    async def fetch_inner_articles(self, date_string=None):
+        if date_string is None:
+            date_string = self.InnerDecodeCreate.DEFAULT_GOOD_READ_DATE
         query = """
             SELECT title, source_id, wx_sn, cover_img_url FROM long_articles_good_read_article WHERE dt = %s
-            AND source_id IN ('20260222161421405412536', '20241011051312526697231', '20241125113847098601958', '20241011042712648349812')
-            ORDER by max_read_rate DESC LIMIT %s;
+            ORDER by max_read_rate DESC;
         """
-        return await self.pool.async_fetch(
-            query=query, params=(date_string, 20)
-        )
+        return await self.pool.async_fetch(query=query, params=(date_string,))
 
     # 获取内部文章生成信息
     async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
-        query = """
+        mod_types = (
+            self.ProduceModuleType.COVER,
+            self.ProduceModuleType.IMAGE,
+            self.ProduceModuleType.TITLE,
+            self.ProduceModuleType.CONTENT,
+            self.ProduceModuleType.SUMMARY,
+        )
+        placeholders = ",".join(["%s"] * len(mod_types))
+        query = f"""
             SELECT produce_module_type, output
             FROM produce_plan_module_output WHERE plan_exe_id = %s
-            AND produce_module_type in (1,2,3,4,18); 
+            AND produce_module_type in ({placeholders});
         """
         return await self.pool.async_fetch(
-            query=query, db_name="aigc", params=(source_id,)
+            query=query, db_name="aigc", params=(source_id, *mod_types)
         )
 
     # 获取文章源信息
@@ -224,8 +241,8 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
                 source_id,
                 task_type,
                 self.TaskStatus.INIT,
-                0,
-                0,
+                self.InnerCreateState.INITIAL_RETRY_COUNT,
+                self.InnerCreateState.INITIAL_LOCKED_AT,
                 now_ts,
                 now_ts,
             ),
@@ -236,9 +253,12 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             SELECT source_id, task_type, status, retry_count, locked_at, remote_task_id, last_error
             FROM {self.INNER_DECODE_CREATE_STATE}
             WHERE source_id = %s AND task_type = %s
-            LIMIT 1;
+            LIMIT %s;
         """
-        rows = await self.pool.async_fetch(query=query, params=(source_id, task_type))
+        rows = await self.pool.async_fetch(
+            query=query,
+            params=(source_id, task_type, self.InnerCreateState.FETCH_STATE_ROW_LIMIT),
+        )
         if not rows:
             return None
         return rows[0]
@@ -259,7 +279,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
               AND (
                     status = %s
                  OR (status = %s AND retry_count < %s)
-                 OR (status = %s AND locked_at > 0 AND locked_at < %s)
+                 OR (status = %s AND locked_at > %s AND locked_at < %s)
               );
         """
         return await self.pool.async_save(
@@ -274,6 +294,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
                 self.TaskStatus.FAILED,
                 max_retry_times,
                 self.TaskStatus.PROCESSING,
+                self.InnerCreateState.LOCKED_AT_CLEARED,
                 lock_expire_before,
             ),
         )
@@ -291,7 +312,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             SET status = %s,
                 remote_task_id = %s,
                 last_error = %s,
-                locked_at = 0,
+                locked_at = %s,
                 updated_at = %s
             WHERE source_id = %s AND task_type = %s AND status = %s;
         """
@@ -301,6 +322,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
                 self.TaskStatus.SUCCESS,
                 remote_task_id,
                 remark,
+                self.InnerCreateState.LOCKED_AT_CLEARED,
                 now_ts,
                 source_id,
                 task_type,
@@ -320,7 +342,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             SET status = %s,
                 retry_count = retry_count + 1,
                 last_error = %s,
-                locked_at = 0,
+                locked_at = %s,
                 updated_at = %s
             WHERE source_id = %s AND task_type = %s AND status = %s;
         """
@@ -329,6 +351,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             params=(
                 self.TaskStatus.INIT,
                 error_message,
+                self.InnerCreateState.LOCKED_AT_CLEARED,
                 now_ts,
                 source_id,
                 task_type,
@@ -348,7 +371,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             SET status = %s,
                 retry_count = retry_count + 1,
                 last_error = %s,
-                locked_at = 0,
+                locked_at = %s,
                 updated_at = %s
             WHERE source_id = %s AND task_type = %s AND status = %s;
         """
@@ -357,6 +380,7 @@ class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
             params=(
                 self.TaskStatus.FAILED,
                 error_message,
+                self.InnerCreateState.LOCKED_AT_CLEARED,
                 now_ts,
                 source_id,
                 task_type,

+ 7 - 4
app/domains/llm_tasks/aigc_decode_task/_utils.py

@@ -134,7 +134,6 @@ class InnerArticlesDecodeUtils(DecodeTaskUtil):
         body_text = "\n".join(body_lines)
         return body_text, images
 
-
     async def create_decode_task(self, payload):
         request_body = {
             "scene": self.BusinessScene.POINT_PICK,
@@ -155,9 +154,13 @@ class InnerArticlesDecodeUtils(DecodeTaskUtil):
         self,
         payload: Dict[str, Any],
         retry_times: int,
-        min_retry_delay: int = 1,
-        max_retry_delay: int = 4,
+        min_retry_delay: int = None,
+        max_retry_delay: int = None,
     ):
+        if min_retry_delay is None:
+            min_retry_delay = self.HttpDecodeCreateRetry.MIN_DELAY_SECONDS
+        if max_retry_delay is None:
+            max_retry_delay = self.HttpDecodeCreateRetry.MAX_DELAY_SECONDS
         retry_kwargs = request_retry(
             retry_times=retry_times,
             min_retry_delay=min_retry_delay,
@@ -172,4 +175,4 @@ __all__ = [
     "AdPlatformArticlesDecodeUtils",
     "InnerArticlesDecodeUtils",
     "DecodeTaskUtil",
-]
+]

+ 55 - 46
app/domains/llm_tasks/aigc_decode_task/create_decode_tasks.py

@@ -32,7 +32,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
             await self.log_service.log(
                 contents={
                     "article_id": article_id,
-                    "task": "create_decode_task",
+                    "task": self.LogTaskKey.CREATE_SINGLE,
                     "status": "skip",
                     "message": "acquire lock failed",
                 }
@@ -50,7 +50,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
             await self.log_service.log(
                 contents={
                     "article_id": article_id,
-                    "task": "create_decode_task",
+                    "task": self.LogTaskKey.CREATE_SINGLE,
                     "status": "fail",
                     "data": response,
                 }
@@ -68,7 +68,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
             await self.log_service.log(
                 contents={
                     "article_id": article_id,
-                    "task": "create_decode_task",
+                    "task": self.LogTaskKey.CREATE_SINGLE,
                     "status": "fail",
                     "data": response,
                 }
@@ -79,7 +79,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
         await self.log_service.log(
             contents={
                 "article_id": article_id,
-                "task": "create_decode_task",
+                "task": self.LogTaskKey.CREATE_SINGLE,
                 "status": "success",
                 "data": response,
             }
@@ -96,7 +96,7 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
             await self.log_service.log(
                 contents={
                     "article_id": article_id,
-                    "task": "record_decode_task",
+                    "task": self.LogTaskKey.RECORD_QUEUE,
                     "status": "fail",
                     "message": "创建 decode 记录失败",
                     "data": response,
@@ -114,13 +114,15 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
         if not article_list:
             await self.log_service.log(
                 contents={
-                    "task": "create_tasks",
+                    "task": self.LogTaskKey.CREATE_BATCH,
                     "message": "No more articles to decode",
                 }
             )
             return
 
-        for article in tqdm(article_list, desc="Creating decode tasks"):
+        for article in tqdm(
+            article_list, desc=self.AdPlatformDecodeBatch.TQDM_DESCRIPTION
+        ):
             await self.create_single_decode_task(article)
 
     async def deal(self):
@@ -128,11 +130,6 @@ class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
 
 
 class CreateInnerArticlesDecodeTask(DecodeTaskConst):
-    CREATE_TASK_NAME = "create_inner_articles_decode_task"
-    MAX_CREATE_RETRY_TIMES = 3
-    LOCK_TIMEOUT_SECONDS = 30 * 60
-    CREATE_MAX_CONCURRENCY = 5
-
     def __init__(self, pool: DatabaseManager, log_service: LogService):
         self.pool = pool
         self.log_service = log_service
@@ -141,11 +138,13 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
 
     async def _log_create_event(self, **contents):
         await self.log_service.log(
-            contents={"task": self.CREATE_TASK_NAME, **contents}
+            contents={"task": self.InnerDecodeCreate.SCHEDULER_TASK_NAME, **contents}
         )
 
     @staticmethod
-    def _trim_error_message(message: str, limit: int = 500):
+    def _trim_error_message(message: str, limit: int = None):
+        if limit is None:
+            limit = DecodeTaskConst.InnerDecodeCreate.ERROR_MESSAGE_MAX_CHARS
         if not message:
             return ""
         return message[:limit]
@@ -160,7 +159,9 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
     ):
         now_ts = int(time.time())
         retry_count = (state or {}).get("retry_count", 0)
-        should_retry = retryable and retry_count < self.MAX_CREATE_RETRY_TIMES
+        should_retry = (
+            retryable and retry_count < self.InnerDecodeCreate.MAX_RETRY_TIMES
+        )
         error_message = self._trim_error_message(error_message)
 
         if should_retry:
@@ -202,27 +203,31 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
                     "title": article["title"],
                     "cover_img": article["cover_img_url"],
                     "channel_content_id": article.get("wx_sn", source_id),
-                    "content_type": self.ContentType.TITLE_COVER
+                    "content_type": self.ContentType.TITLE_COVER,
                 }
 
             case self.TaskType.SOURCE_IMAGES_TEXT:
-                crawl_source_info = await self.mapper.fetch_article_crawler_source_info(source_id)
+                crawl_source_info = await self.mapper.fetch_article_crawler_source_info(
+                    source_id
+                )
                 if not crawl_source_info:
                     raise ValueError("未找到文章抓取源信息")
 
                 crawl_info = crawl_source_info[0]
                 channel_content_id = crawl_info["channel_content_id"]
                 raw_body_text = crawl_info["body_text"]
-                body_text, images = self.tool.extract_body_text_and_images(raw_body_text)
+                body_text, images = self.tool.extract_body_text_and_images(
+                    raw_body_text
+                )
                 if not body_text and not images:
                     raise ValueError("文章正文和图片均为空,无法创建解构任务")
 
                 return {
                     "source_id": source_id,
                     "images": images,
-                    "body_text": body_text,
+                    "body_text": raw_body_text,
                     "channel_content_id": channel_content_id or source_id,
-                    "content_type": self.ContentType.LONG_ARTICLE
+                    "content_type": self.ContentType.LONG_ARTICLE,
                 }
 
             case self.TaskType.MINI_TITLE_CARD:
@@ -235,15 +240,32 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
         article = task["article"]
         task_type = task["task_type"]
         source_id = article["source_id"]
+        exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
+        if exist_task:
+            await self.mapper.mark_create_success(
+                source_id=source_id,
+                task_type=task_type,
+                remote_task_id=self.InnerDecodeCreate.DUPLICATE_SKIP_REMOTE_TASK_ID,
+                now_ts=int(time.time()),
+                remark="任务已存在,跳过重复创建",
+            )
+            await self._log_create_event(
+                source_id=source_id,
+                task_type=task_type,
+                status="skip",
+                message="decode task already exists",
+            )
+            return
+
         now_ts = int(time.time())
-        lock_expire_before = now_ts - self.LOCK_TIMEOUT_SECONDS
+        lock_expire_before = now_ts - self.INNER_DECODE_LOCK_TIMEOUT_SECONDS
 
         await self.mapper.init_create_state(source_id, task_type, now_ts)
         acquire_lock = await self.mapper.acquire_create_lock(
             source_id=source_id,
             task_type=task_type,
             now_ts=now_ts,
-            max_retry_times=self.MAX_CREATE_RETRY_TIMES,
+            max_retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
             lock_expire_before=lock_expire_before,
         )
         if not acquire_lock:
@@ -258,28 +280,11 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
         state = await self.mapper.fetch_create_state(source_id, task_type)
 
         try:
-            # exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
-            # if exist_task:
-            #     await self.mapper.mark_create_success(
-            #         source_id=source_id,
-            #         task_type=task_type,
-            #         remote_task_id="existing_task",
-            #         now_ts=int(time.time()),
-            #         remark="任务已存在,跳过重复创建",
-            #     )
-            #     await self._log_create_event(
-            #         source_id=source_id,
-            #         task_type=task_type,
-            #         status="skip",
-            #         message="decode task already exists",
-            #     )
-            #     return
-
             payload = await self._build_payload(article, task_type)
 
             response = await self.tool.create_decode_task_with_retry(
                 payload=payload,
-                retry_times=self.MAX_CREATE_RETRY_TIMES,
+                retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
             )
             response_code = response.get("code")
             if response_code != self.RequestDecode.SUCCESS:
@@ -313,7 +318,10 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
                 payload=json.dumps(payload, ensure_ascii=False),
                 remark=remark,
             )
-            if record_row not in (0, 1):
+            if record_row not in (
+                self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_NOOP,
+                self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_INSERTED,
+            ):
                 await self._mark_retry_or_failed(
                     source_id=source_id,
                     task_type=task_type,
@@ -363,7 +371,7 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
             )
 
     async def create_tasks(self, date_string: str = None, max_concurrency: int = None):
-        article_list = await self.mapper.fetch_inner_articles(date_string or "20260401")
+        article_list = await self.mapper.fetch_inner_articles(date_string)
         if not article_list:
             await self._log_create_event(
                 status="empty",
@@ -384,10 +392,11 @@ class CreateInnerArticlesDecodeTask(DecodeTaskConst):
         result = await run_tasks_with_asyncio_task_group(
             task_list=task_list,
             handler=self.create_single_decode_task,
-            description="Creating inner decode tasks",
-            unit="task",
-            max_concurrency=max_concurrency or self.CREATE_MAX_CONCURRENCY,
-            fail_fast=False,
+            description=self.InnerDecodeCreate.ASYNC_BATCH_DESCRIPTION,
+            unit=self.InnerDecodeCreate.ASYNC_BATCH_UNIT,
+            max_concurrency=max_concurrency
+            or self.InnerDecodeCreate.DEFAULT_MAX_CONCURRENCY,
+            fail_fast=self.InnerDecodeCreate.ASYNC_BATCH_FAIL_FAST,
         )
         if result["errors"]:
             await self._log_create_event(