Browse Source

fix README.md

luojunhui 1 month ago
parent
commit
40e79d1092
4 changed files with 258 additions and 593 deletions
  1. 35 0
      README.md
  2. 1 2
      applications/tasks/__init__.py
  3. 222 302
      applications/tasks/task_scheduler.py
  4. 0 289
      applications/tasks/task_scheduler_v2.py

+ 35 - 0
README.md

@@ -117,3 +117,38 @@ docker compose up -d
 tree -I "__pycache__|*.pyc"
 ```
 
+## 数据任务
+#### daily发文数据回收
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "daily_publish_articles_recycle"}'
+```
+#### 账号质量处理
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "candidate_account_quality_analysis"}'
+```
+## 抓取任务
+
+#### 今日头条账号内文章抓取
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_toutiao"}'
+```
+#### 今日头条推荐抓取文章
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_toutiao", "method": "recommend"}'
+```
+#### 今日头条搜索抓取账号
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_toutiao", "method": "search"}'
+```
+
+## 冷启动发布任务
+#### 发布头条文章
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "article_pool_cold_start", "platform": "toutiao", "crawler_methods": ["toutiao_account_association"]}'
+```
+#### 发布公众号文章
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "article_pool_cold_start"}'
+```
+
+

+ 1 - 2
applications/tasks/__init__.py

@@ -1,2 +1 @@
-# from .task_scheduler import TaskScheduler
-from .task_scheduler_v2 import TaskScheduler
+from .task_scheduler import TaskScheduler

+ 222 - 302
applications/tasks/task_scheduler.py

@@ -1,16 +1,20 @@
 import asyncio
+import json
 import time
+import traceback
 from datetime import datetime
+from typing import Awaitable, Callable, Dict
 
 from applications.api import feishu_robot
 from applications.utils import task_schedule_response, generate_task_trace_id
 
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
-from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
@@ -22,6 +26,9 @@ from applications.tasks.task_mapper import TaskMapper
 
 
 class TaskScheduler(TaskMapper):
+    """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
+
+    # ---------- 初始化 ----------
     def __init__(self, data, log_service, db_client):
         self.data = data
         self.log_client = log_service
@@ -29,341 +36,254 @@ class TaskScheduler(TaskMapper):
         self.table = "long_articles_task_manager"
         self.trace_id = generate_task_trace_id()
 
-    async def whether_task_processing(self, task_name: str) -> bool:
-        """whether task is processing"""
-        query = f"""
-            select start_timestamp from {self.table} where task_name = %s and task_status = %s;
-        """
-        response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
-        if not response:
-            # no task is processing
-            return False
-        else:
-            start_timestamp = response[0]["start_timestamp"]
-
-            if int(time.time()) - start_timestamp >= self.get_task_config(
-                task_name
-            ).get("expire_duration", self.DEFAULT_TIMEOUT):
-                await feishu_robot.bot(
-                    title=f"{task_name} has been processing over timeout",
-                    detail={"timestamp": start_timestamp},
-                    env="long_articles_task",
-                )
-            return True
-
-    async def record_task(self, task_name, date_string):
-        """record task"""
-        query = f"""insert into {self.table} (date_string, task_name, start_timestamp, trace_id) values (%s, %s, %s, %s);"""
+    # ---------- 公共数据库工具 ----------
+    async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
+        """新建记录(若同键已存在则忽略)"""
+        query = (
+            f"insert ignore into {self.table} "
+            "(date_string, task_name, start_timestamp, task_status, trace_id, data) "
+            "values (%s, %s, %s, %s, %s, %s);"
+        )
         await self.db_client.async_save(
             query=query,
-            params=(date_string, task_name, int(time.time()), self.trace_id),
+            params=(
+                date_str,
+                task_name,
+                int(time.time()),
+                self.TASK_INIT_STATUS,
+                self.trace_id,
+                json.dumps(self.data, ensure_ascii=False),
+            ),
         )
 
-    async def lock_task(self, task_name, date_string):
-        query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
-        return await self.db_client.async_save(
+    async def _try_lock_task(self) -> bool:
+        """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
+        query = (
+            f"update {self.table} "
+            "set task_status = %s "
+            "where trace_id = %s  and task_status = %s;"
+        )
+        res = await self.db_client.async_save(
             query=query,
             params=(
                 self.TASK_PROCESSING_STATUS,
-                task_name,
-                date_string,
+                self.trace_id,
                 self.TASK_INIT_STATUS,
             ),
         )
+        return True if res else False
 
-    async def release_task(self, task_name, date_string, final_status=None):
-        """
-        任务执行完成之后,将任务状态设置为完成状态/失败状态
-        """
-        if not final_status:
-            final_status = self.TASK_SUCCESS_STATUS
-        query = f"""
-           update {self.table} 
-           set task_status = %s, finish_timestamp = %s
-           where task_name = %s and date_string = %s and task_status = %s;
-        """
-        return await self.db_client.async_save(
+    async def _release_task(self, status: int) -> None:
+        query = (
+            f"update {self.table} set task_status=%s, finish_timestamp=%s "
+            "where trace_id=%s and task_status=%s;"
+        )
+        await self.db_client.async_save(
             query=query,
             params=(
-                final_status,
+                status,
                 int(time.time()),
-                task_name,
-                date_string,
+                self.trace_id,
                 self.TASK_PROCESSING_STATUS,
             ),
         )
 
-    async def deal(self):
-        task_name = self.data.get("task_name")
-        date_string = self.data.get("date_string")
-        if not task_name:
-            await self.log_client.log(
-                contents={
-                    "task": task_name,
-                    "function": "task_scheduler_deal",
-                    "message": "not task name in params",
-                    "status": "fail",
-                    "data": self.data,
-                }
-            )
-            return await task_schedule_response.fail_response(
-                error_code="4002", error_message="task_name must be input"
+    async def _is_processing_overtime(self, task_name) -> bool:
+        """检测在处理任务是否超时,或者超过最大并行数,若超时会发飞书告警"""
+        query = f"select trace_id from {self.table} where task_status = %s and task_name = %s;"
+        rows = await self.db_client.async_fetch(
+            query=query, params=(self.TASK_PROCESSING_STATUS, task_name)
+        )
+        if not rows:
+            return False
+
+        processing_task_num = len(rows)
+        if processing_task_num >= self.get_task_config(task_name).get(
+            "task_max_num", self.TASK_MAX_NUM
+        ):
+            await feishu_robot.bot(
+                title=f"multi {task_name} is processing ",
+                detail={"detail": rows},
             )
+            return True
 
-        if not date_string:
-            date_string = datetime.today().strftime("%Y-%m-%d")
+        return False
 
-        # prepare for task
-        if await self.whether_task_processing(task_name):
+    async def _run_with_guard(
+        self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
+    ):
+        """公共:检查、建记录、抢锁、后台运行"""
+        # 1. 超时检测
+        if await self._is_processing_overtime(task_name):
             return await task_schedule_response.fail_response(
-                error_code="5001", error_message="task is processing"
+                "5005", "muti tasks with same task_name is processing"
             )
 
-        await self.log_client.log(
-            contents={
-                "trace_id": self.trace_id,
-                "task": task_name,
-                "message": "start processing",
-                "data": self.data,
-            }
-        )
-
-        await self.record_task(task_name=task_name, date_string=date_string)
-
-        await self.lock_task(task_name, date_string)
+        # 2. 记录并尝试抢锁
+        await self._insert_or_ignore_task(task_name, date_str)
+        if not await self._try_lock_task():
+            return await task_schedule_response.fail_response(
+                "5001", "task is processing"
+            )
 
-        match task_name:
-            case "check_kimi_balance":
-                response = await check_kimi_balance()
+        # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
+        async def _wrapper():
+            status = self.TASK_FAILED_STATUS
+            try:
+                status = await task_coro()
+            except Exception as e:
                 await self.log_client.log(
                     contents={
+                        "trace_id": self.trace_id,
+                        "function": "cor_wrapper",
                         "task": task_name,
-                        "function": "task_scheduler_deal",
-                        "message": "check_kimi_balance task execute successfully",
-                        "status": "success",
-                        "data": response,
+                        "error": str(e),
                     }
                 )
-                await self.release_task(
-                    task_name=task_name,
-                    date_string=date_string,
-                    final_status=response["code"],
-                )
-                return await task_schedule_response.success_response(
-                    task_name=task_name, data=response
-                )
-
-            case "get_off_videos":
-
-                async def background_get_off_videos():
-                    sub_task = GetOffVideos(self.db_client, self.log_client)
-                    await sub_task.get_off_job()
-                    task_status = await sub_task.check()
-                    await self.release_task(
-                        task_name=task_name,
-                        date_string=date_string,
-                        final_status=task_status,
-                    )
-
-                asyncio.create_task(background_get_off_videos())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "get off_videos started background"},
-                )
-
-            case "check_publish_video_audit_status":
-
-                async def background_check_publish_video_audit_status():
-                    sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
-                    task_status = await sub_task.deal()
-                    await self.release_task(
-                        task_name=task_name,
-                        date_string=date_string,
-                        final_status=task_status,
-                    )
-                    print("finish task status: ", task_status)
-
-                asyncio.create_task(background_check_publish_video_audit_status())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={
-                        "code": 0,
-                        "message": "check publish video audit status started",
-                    },
-                )
-
-            case "outside_article_monitor":
-
-                async def background_outside_article_monitor():
-                    collect_task = OutsideGzhArticlesCollector(self.db_client)
-                    await collect_task.deal()
-                    monitor_task = OutsideGzhArticlesMonitor(self.db_client)
-                    final_status = await monitor_task.deal()
-                    await self.release_task(
-                        task_name, date_string, final_status=final_status
-                    )
-
-                asyncio.create_task(background_outside_article_monitor())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={
-                        "code": 0,
-                        "message": "outside_article_monitor started background",
-                    },
-                )
-
-            case "inner_article_monitor":
-
-                async def background_inner_article_monitor():
-                    task = InnerGzhArticlesMonitor(self.db_client)
-                    final_status = await task.deal()
-                    await self.release_task(
-                        task_name, date_string, final_status=final_status
-                    )
-
-                asyncio.create_task(background_inner_article_monitor())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
-
-            case "title_rewrite":
-
-                async def background_title_rewrite():
-                    sub_task = TitleRewrite(self.db_client, self.log_client)
-                    await sub_task.deal()
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-
-                asyncio.create_task(background_title_rewrite())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={
-                        "code": 0,
-                        "message": "inner_article_monitor started background",
+                await feishu_robot.bot(
+                    title=f"{task_name} is failed",
+                    detail={
+                        "task": task_name,
+                        "err": str(e),
+                        "traceback": traceback.format_exc(),
                     },
                 )
+            finally:
+                await self._release_task(status)
 
-            case "daily_publish_articles_recycle":
-
-                async def background_daily_publish_articles_recycle():
-                    sub_task = RecycleDailyPublishArticlesTask(
-                        self.db_client, self.log_client, date_string
-                    )
-                    await sub_task.deal()
-                    task = CheckDailyPublishArticlesTask(
-                        self.db_client, self.log_client, date_string
-                    )
-                    await task.deal()
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-
-                asyncio.create_task(background_daily_publish_articles_recycle())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
-
-            case "update_root_source_id":
-
-                async def background_update_root_source_id():
-                    sub_task = UpdateRootSourceIdAndUpdateTimeTask(
-                        self.db_client, self.log_client
-                    )
-                    await sub_task.deal()
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-
-                asyncio.create_task(background_update_root_source_id())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
-
-            case "task_processing_monitor":
-
-                async def background_task_processing_monitor():
-                    sub_task = TaskProcessingMonitor(self.db_client)
-                    await sub_task.deal()
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-
-                asyncio.create_task(background_task_processing_monitor())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
+        asyncio.create_task(_wrapper(), name=task_name)
+        return await task_schedule_response.success_response(
+            task_name=task_name,
+            data={"code": 0, "message": "task started", "trace_id": self.trace_id},
+        )
 
-            case "crawler_toutiao_articles":
-
-                async def background_crawler_toutiao_articles():
-                    sub_task = CrawlerToutiao(
-                        self.db_client, self.log_client, self.trace_id
-                    )
-                    media_type = self.data.get("media_type", "article")
-                    method = self.data.get("method", "account")
-                    category_list = self.data.get("category_list", [])
-                    match method:
-                        case "account":
-                            await sub_task.crawler_task(media_type=media_type)
-                        case "recommend":
-                            await sub_task.crawl_toutiao_recommend_task(
-                                category_list=category_list
-                            )
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-
-                asyncio.create_task(background_crawler_toutiao_articles())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
+    # ---------- 主入口 ----------
+    async def deal(self):
+        task_name: str | None = self.data.get("task_name")
+        if not task_name:
+            return await task_schedule_response.fail_response(
+                "4003", "task_name must be input"
+            )
 
-            case "article_pool_pool_cold_start":
-
-                async def background_article_pool_pool_cold_start():
-                    sub_task = ArticlePoolColdStart(
-                        self.db_client, self.log_client, self.trace_id
-                    )
-                    crawler_methods = self.data.get("crawler_methods", [])
-                    platform = self.data.get("platform", "weixin")
-                    await sub_task.deal(
-                        platform=platform, crawl_methods=crawler_methods
-                    )
-                    await self.release_task(
-                        task_name=task_name, date_string=date_string
-                    )
-                    await self.log_client.log(
-                        contents={
-                            "trace_id": self.trace_id,
-                            "task": task_name,
-                            "message": "finish processed",
-                            "data": self.data,
-                        }
-                    )
-
-                asyncio.create_task(background_article_pool_pool_cold_start())
-                return await task_schedule_response.success_response(
-                    task_name=task_name,
-                    data={"code": 0, "message": "task started background"},
-                )
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+
+        # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
+        handlers: Dict[str, Callable[[], Awaitable[int]]] = {
+            # 校验kimi余额
+            "check_kimi_balance": self._check_kimi_balance_handler,
+            # 长文视频发布之后,三天后下架
+            "get_off_videos": self._get_off_videos_task_handler,
+            # 长文视频发布之后,三天内保持视频可见状态
+            "check_publish_video_audit_status": self._check_video_audit_status_handler,
+            # 外部服务号发文监测
+            "outside_article_monitor": self._outside_monitor_handler,
+            # 站内发文监测
+            "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
+            # 标题重写(代测试)
+            "title_rewrite": self._title_rewrite_handler,
+            # 每日发文数据回收
+            "daily_publish_articles_recycle": self._recycle_article_data_handler,
+            # 每日发文更新root_source_id
+            "update_root_source_id": self._update_root_source_id_handler,
+            # 头条文章,视频抓取
+            "crawler_toutiao": self._crawler_toutiao_handler,
+            # 文章池冷启动发布
+            "article_pool_cold_start": self._article_pool_cold_start_handler,
+            # 任务超时监控
+            "task_processing_monitor": self._task_processing_monitor_handler,
+            # 候选账号质量分析
+            "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
+        }
+
+        if task_name not in handlers:
+            return await task_schedule_response.fail_response(
+                "4001", "wrong task name input"
+            )
+        return await self._run_with_guard(task_name, date_str, handlers[task_name])
 
+    # ---------- 下面是若干复合任务的局部实现 ----------
+    async def _check_kimi_balance_handler(self) -> int:
+        response = await check_kimi_balance()
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "check_kimi_balance",
+                "data": response,
+            }
+        )
+        return self.TASK_SUCCESS_STATUS
+
+    async def _get_off_videos_task_handler(self) -> int:
+        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _check_video_audit_status_handler(self) -> int:
+        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _task_processing_monitor_handler(self) -> int:
+        sub_task = TaskProcessingMonitor(self.db_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _inner_gzh_articles_monitor_handler(self) -> int:
+        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _title_rewrite_handler(self):
+        sub_task = TitleRewrite(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _update_root_source_id_handler(self) -> int:
+        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _outside_monitor_handler(self) -> int:
+        collector = OutsideGzhArticlesCollector(self.db_client)
+        await collector.deal()
+        monitor = OutsideGzhArticlesMonitor(self.db_client)
+        return await monitor.deal()  # 应返回 SUCCESS / FAILED
+
+    async def _recycle_article_data_handler(self) -> int:
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+        recycle = RecycleDailyPublishArticlesTask(
+            self.db_client, self.log_client, date_str
+        )
+        await recycle.deal()
+        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
+        await check.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_toutiao_handler(self) -> int:
+        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        method = self.data.get("method", "account")
+        media_type = self.data.get("media_type", "article")
+        category_list = self.data.get("category_list", [])
+
+        match method:
+            case "account":
+                await sub_task.crawler_task(media_type=media_type)
+            case "recommend":
+                await sub_task.crawl_toutiao_recommend_task(category_list)
+            case "search":
+                await sub_task.search_candidate_accounts()
             case _:
-                await self.log_client.log(
-                    contents={
-                        "task": task_name,
-                        "function": "task_scheduler_deal",
-                        "message": "wrong task input",
-                        "status": "success",
-                        "data": self.data,
-                    }
-                )
-                await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS)
-                return await task_schedule_response.fail_response(
-                    error_code="4001", error_message="wrong task name input"
-                )
+                raise ValueError(f"Unsupported method {method}")
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_cold_start_handler(self) -> int:
+        cold_start = ArticlePoolColdStart(
+            self.db_client, self.log_client, self.trace_id
+        )
+        platform = self.data.get("platform", "weixin")
+        crawler_methods = self.data.get("crawler_methods", [])
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        return self.TASK_SUCCESS_STATUS
+
+    async def _candidate_account_quality_score_handler(self) -> int:
+        task = CandidateAccountQualityScoreRecognizer(
+            self.db_client, self.log_client, self.trace_id
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS

+ 0 - 289
applications/tasks/task_scheduler_v2.py

@@ -1,289 +0,0 @@
-import asyncio
-import json
-import time
-import traceback
-from datetime import datetime
-from typing import Awaitable, Callable, Dict
-
-from applications.api import feishu_robot
-from applications.utils import task_schedule_response, generate_task_trace_id
-
-from applications.tasks.cold_start_tasks import ArticlePoolColdStart
-from applications.tasks.crawler_tasks import CrawlerToutiao
-from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
-from applications.tasks.llm_tasks import TitleRewrite
-from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
-from applications.tasks.monitor_tasks import check_kimi_balance
-from applications.tasks.monitor_tasks import GetOffVideos
-from applications.tasks.monitor_tasks import CheckVideoAuditStatus
-from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
-from applications.tasks.monitor_tasks import TaskProcessingMonitor
-from applications.tasks.task_mapper import TaskMapper
-
-
-class TaskScheduler(TaskMapper):
-    """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
-
-    # ---------- 初始化 ----------
-    def __init__(self, data, log_service, db_client):
-        self.data = data
-        self.log_client = log_service
-        self.db_client = db_client
-        self.table = "long_articles_task_manager"
-        self.trace_id = generate_task_trace_id()
-
-    # ---------- 公共数据库工具 ----------
-    async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
-        """新建记录(若同键已存在则忽略)"""
-        query = (
-            f"insert ignore into {self.table} "
-            "(date_string, task_name, start_timestamp, task_status, trace_id, data) "
-            "values (%s, %s, %s, %s, %s, %s);"
-        )
-        await self.db_client.async_save(
-            query=query,
-            params=(
-                date_str,
-                task_name,
-                int(time.time()),
-                self.TASK_INIT_STATUS,
-                self.trace_id,
-                json.dumps(self.data, ensure_ascii=False),
-            ),
-        )
-
-    async def _try_lock_task(self) -> bool:
-        """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
-        query = (
-            f"update {self.table} "
-            "set task_status = %s "
-            "where trace_id = %s  and task_status = %s;"
-        )
-        res = await self.db_client.async_save(
-            query=query,
-            params=(
-                self.TASK_PROCESSING_STATUS,
-                self.trace_id,
-                self.TASK_INIT_STATUS,
-            ),
-        )
-        return True if res else False
-
-    async def _release_task(self, status: int) -> None:
-        query = (
-            f"update {self.table} set task_status=%s, finish_timestamp=%s "
-            "where trace_id=%s and task_status=%s;"
-        )
-        await self.db_client.async_save(
-            query=query,
-            params=(
-                status,
-                int(time.time()),
-                self.trace_id,
-                self.TASK_PROCESSING_STATUS,
-            ),
-        )
-
-    async def _is_processing_overtime(self, task_name) -> bool:
-        """检测在处理任务是否超时,或者超过最大并行数,若超时会发飞书告警"""
-        query = f"select trace_id from {self.table} where task_status = %s and task_name = %s;"
-        rows = await self.db_client.async_fetch(
-            query=query, params=(self.TASK_PROCESSING_STATUS, task_name)
-        )
-        if not rows:
-            return False
-
-        processing_task_num = len(rows)
-        if processing_task_num >= self.get_task_config(task_name).get(
-            "task_max_num", self.TASK_MAX_NUM
-        ):
-            await feishu_robot.bot(
-                title=f"multi {task_name} is processing ",
-                detail={"detail": rows},
-            )
-            return True
-
-        return False
-
-    async def _run_with_guard(
-        self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
-    ):
-        """公共:检查、建记录、抢锁、后台运行"""
-        # 1. 超时检测
-        if await self._is_processing_overtime(task_name):
-            return await task_schedule_response.fail_response(
-                "5005", "muti tasks with same task_name is processing"
-            )
-
-        # 2. 记录并尝试抢锁
-        await self._insert_or_ignore_task(task_name, date_str)
-        if not await self._try_lock_task():
-            return await task_schedule_response.fail_response(
-                "5001", "task is processing"
-            )
-
-        # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
-        async def _wrapper():
-            status = self.TASK_FAILED_STATUS
-            try:
-                status = await task_coro()
-            except Exception as e:
-                await self.log_client.log(
-                    contents={
-                        "trace_id": self.trace_id,
-                        "function": "cor_wrapper",
-                        "task": task_name,
-                        "error": str(e),
-                    }
-                )
-                await feishu_robot.bot(
-                    title=f"{task_name} is failed",
-                    detail={
-                        "task": task_name,
-                        "err": str(e),
-                        "traceback": traceback.format_exc(),
-                    },
-                )
-            finally:
-                await self._release_task(status)
-
-        asyncio.create_task(_wrapper(), name=task_name)
-        return await task_schedule_response.success_response(
-            task_name=task_name,
-            data={"code": 0, "message": "task started", "trace_id": self.trace_id},
-        )
-
-    # ---------- 主入口 ----------
-    async def deal(self):
-        task_name: str | None = self.data.get("task_name")
-        if not task_name:
-            return await task_schedule_response.fail_response(
-                "4003", "task_name must be input"
-            )
-
-        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
-
-        # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
-        handlers: Dict[str, Callable[[], Awaitable[int]]] = {
-            # 校验kimi余额
-            "check_kimi_balance": self._check_kimi_balance_handler,
-            # 长文视频发布之后,三天后下架
-            "get_off_videos": self._get_off_videos_task_handler,
-            # 长文视频发布之后,三天内保持视频可见状态
-            "check_publish_video_audit_status": self._check_video_audit_status_handler,
-            # 外部服务号发文监测
-            "outside_article_monitor": self._outside_monitor_handler,
-            # 站内发文监测
-            "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
-            # 标题重写(代测试)
-            "title_rewrite": self._title_rewrite_handler,
-            # 每日发文数据回收
-            "daily_publish_articles_recycle": self._recycle_article_data_handler,
-            # 每日发文更新root_source_id
-            "update_root_source_id": self._update_root_source_id_handler,
-            # 头条文章,视频抓取
-            "crawler_toutiao": self._crawler_toutiao_handler,
-            # 文章池冷启动发布
-            "article_pool_cold_start": self._article_pool_cold_start_handler,
-            # 任务超时监控
-            "task_processing_monitor": self._task_processing_monitor_handler,
-            # 候选账号质量分析
-            "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
-        }
-
-        if task_name not in handlers:
-            return await task_schedule_response.fail_response(
-                "4001", "wrong task name input"
-            )
-        return await self._run_with_guard(task_name, date_str, handlers[task_name])
-
-    # ---------- 下面是若干复合任务的局部实现 ----------
-    async def _check_kimi_balance_handler(self) -> int:
-        response = await check_kimi_balance()
-        await self.log_client.log(
-            contents={
-                "trace_id": self.trace_id,
-                "task": "check_kimi_balance",
-                "data": response,
-            }
-        )
-        return self.TASK_SUCCESS_STATUS
-
-    async def _get_off_videos_task_handler(self) -> int:
-        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _check_video_audit_status_handler(self) -> int:
-        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _task_processing_monitor_handler(self) -> int:
-        sub_task = TaskProcessingMonitor(self.db_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _inner_gzh_articles_monitor_handler(self) -> int:
-        sub_task = InnerGzhArticlesMonitor(self.db_client)
-        return await sub_task.deal()
-
-    async def _title_rewrite_handler(self):
-        sub_task = TitleRewrite(self.db_client, self.log_client)
-        return await sub_task.deal()
-
-    async def _update_root_source_id_handler(self) -> int:
-        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _outside_monitor_handler(self) -> int:
-        collector = OutsideGzhArticlesCollector(self.db_client)
-        await collector.deal()
-        monitor = OutsideGzhArticlesMonitor(self.db_client)
-        return await monitor.deal()  # 应返回 SUCCESS / FAILED
-
-    async def _recycle_article_data_handler(self) -> int:
-        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
-        recycle = RecycleDailyPublishArticlesTask(
-            self.db_client, self.log_client, date_str
-        )
-        await recycle.deal()
-        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
-        await check.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _crawler_toutiao_handler(self) -> int:
-        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
-        method = self.data.get("method", "account")
-        media_type = self.data.get("media_type", "article")
-        category_list = self.data.get("category_list", [])
-
-        match method:
-            case "account":
-                await sub_task.crawler_task(media_type=media_type)
-            case "recommend":
-                await sub_task.crawl_toutiao_recommend_task(category_list)
-            case "search":
-                await sub_task.search_candidate_accounts()
-            case _:
-                raise ValueError(f"Unsupported method {method}")
-        return self.TASK_SUCCESS_STATUS
-
-    async def _article_pool_cold_start_handler(self) -> int:
-        cold_start = ArticlePoolColdStart(
-            self.db_client, self.log_client, self.trace_id
-        )
-        platform = self.data.get("platform", "weixin")
-        crawler_methods = self.data.get("crawler_methods", [])
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
-        return self.TASK_SUCCESS_STATUS
-
-    async def _candidate_account_quality_score_handler(self) -> int:
-        task = CandidateAccountQualityScoreRecognizer(
-            self.db_client, self.log_client, self.trace_id
-        )
-        await task.deal()
-        return self.TASK_SUCCESS_STATUS