ソースを参照

Merge branch 'feature/luojunhui/coroutine-manager-improve' of Server/LongArticleTaskServer into feature/luojunhui/2025-08-04-add-title-process-task

luojunhui 1 ヶ月 前
コミット
b9c7847eb7

+ 1 - 1
applications/tasks/__init__.py

@@ -1 +1 @@
-from .task_scheduler import TaskScheduler
+from .task_scheduler import TaskScheduler

+ 112 - 0
applications/tasks/task_handler.py

@@ -0,0 +1,112 @@
+from datetime import datetime
+
+from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+from applications.tasks.crawler_tasks import CrawlerToutiao
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
+from applications.tasks.monitor_tasks import check_kimi_balance
+from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.task_mapper import TaskMapper
+
+
+class TaskHandler(TaskMapper):
+    def __init__(self, data, log_service, db_client, trace_id):
+        self.data = data
+        self.log_client = log_service
+        self.db_client = db_client
+        self.trace_id = trace_id
+
+    # ---------- 下面是若干复合任务的局部实现 ----------
+    async def _check_kimi_balance_handler(self) -> int:
+        response = await check_kimi_balance()
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "check_kimi_balance",
+                "data": response,
+            }
+        )
+        return self.TASK_SUCCESS_STATUS
+
+    async def _get_off_videos_task_handler(self) -> int:
+        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _check_video_audit_status_handler(self) -> int:
+        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _task_processing_monitor_handler(self) -> int:
+        sub_task = TaskProcessingMonitor(self.db_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _inner_gzh_articles_monitor_handler(self) -> int:
+        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _title_rewrite_handler(self):
+        sub_task = TitleRewrite(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _update_root_source_id_handler(self) -> int:
+        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _outside_monitor_handler(self) -> int:
+        collector = OutsideGzhArticlesCollector(self.db_client)
+        await collector.deal()
+        monitor = OutsideGzhArticlesMonitor(self.db_client)
+        return await monitor.deal()  # 应返回 SUCCESS / FAILED
+
+    async def _recycle_article_data_handler(self) -> int:
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+        recycle = RecycleDailyPublishArticlesTask(
+            self.db_client, self.log_client, date_str
+        )
+        await recycle.deal()
+        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
+        await check.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_toutiao_handler(self) -> int:
+        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        method = self.data.get("method", "account")
+        media_type = self.data.get("media_type", "article")
+        category_list = self.data.get("category_list", [])
+
+        match method:
+            case "account":
+                await sub_task.crawler_task(media_type=media_type)
+            case "recommend":
+                await sub_task.crawl_toutiao_recommend_task(category_list)
+            case "search":
+                await sub_task.search_candidate_accounts()
+            case _:
+                raise ValueError(f"Unsupported method {method}")
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_cold_start_handler(self) -> int:
+        cold_start = ArticlePoolColdStart(
+            self.db_client, self.log_client, self.trace_id
+        )
+        platform = self.data.get("platform", "weixin")
+        crawler_methods = self.data.get("crawler_methods", [])
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        return self.TASK_SUCCESS_STATUS
+
+    async def _candidate_account_quality_score_handler(self) -> int:
+        task = CandidateAccountQualityScoreRecognizer(
+            self.db_client, self.log_client, self.trace_id
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS

+ 6 - 107
applications/tasks/task_scheduler.py

@@ -6,35 +6,21 @@ from datetime import datetime
 from typing import Awaitable, Callable, Dict
 
 from applications.api import feishu_robot
-from applications.utils import task_schedule_response, generate_task_trace_id
+from applications.utils import task_schedule_response
+from applications.tasks.task_handler import TaskHandler
 
-from applications.tasks.cold_start_tasks import ArticlePoolColdStart
-from applications.tasks.crawler_tasks import CrawlerToutiao
-from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
-from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
-from applications.tasks.llm_tasks import TitleRewrite
-from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
-from applications.tasks.monitor_tasks import check_kimi_balance
-from applications.tasks.monitor_tasks import GetOffVideos
-from applications.tasks.monitor_tasks import CheckVideoAuditStatus
-from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
-from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
-from applications.tasks.monitor_tasks import TaskProcessingMonitor
-from applications.tasks.task_mapper import TaskMapper
 
-
-class TaskScheduler(TaskMapper):
+class TaskScheduler(TaskHandler):
     """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
 
     # ---------- 初始化 ----------
-    def __init__(self, data, log_service, db_client):
+    def __init__(self, data, log_service, db_client, trace_id):
+        super().__init__(data, log_service, db_client, trace_id)
         self.data = data
         self.log_client = log_service
         self.db_client = db_client
         self.table = "long_articles_task_manager"
-        self.trace_id = generate_task_trace_id()
+        self.trace_id = trace_id
 
     # ---------- 公共数据库工具 ----------
     async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
@@ -200,90 +186,3 @@ class TaskScheduler(TaskMapper):
                 "4001", "wrong task name input"
             )
         return await self._run_with_guard(task_name, date_str, handlers[task_name])
-
-    # ---------- 下面是若干复合任务的局部实现 ----------
-    async def _check_kimi_balance_handler(self) -> int:
-        response = await check_kimi_balance()
-        await self.log_client.log(
-            contents={
-                "trace_id": self.trace_id,
-                "task": "check_kimi_balance",
-                "data": response,
-            }
-        )
-        return self.TASK_SUCCESS_STATUS
-
-    async def _get_off_videos_task_handler(self) -> int:
-        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _check_video_audit_status_handler(self) -> int:
-        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
-        return await sub_task.deal()
-
-    async def _task_processing_monitor_handler(self) -> int:
-        sub_task = TaskProcessingMonitor(self.db_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _inner_gzh_articles_monitor_handler(self) -> int:
-        sub_task = InnerGzhArticlesMonitor(self.db_client)
-        return await sub_task.deal()
-
-    async def _title_rewrite_handler(self):
-        sub_task = TitleRewrite(self.db_client, self.log_client)
-        return await sub_task.deal()
-
-    async def _update_root_source_id_handler(self) -> int:
-        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
-        await sub_task.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _outside_monitor_handler(self) -> int:
-        collector = OutsideGzhArticlesCollector(self.db_client)
-        await collector.deal()
-        monitor = OutsideGzhArticlesMonitor(self.db_client)
-        return await monitor.deal()  # 应返回 SUCCESS / FAILED
-
-    async def _recycle_article_data_handler(self) -> int:
-        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
-        recycle = RecycleDailyPublishArticlesTask(
-            self.db_client, self.log_client, date_str
-        )
-        await recycle.deal()
-        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
-        await check.deal()
-        return self.TASK_SUCCESS_STATUS
-
-    async def _crawler_toutiao_handler(self) -> int:
-        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
-        method = self.data.get("method", "account")
-        media_type = self.data.get("media_type", "article")
-        category_list = self.data.get("category_list", [])
-
-        match method:
-            case "account":
-                await sub_task.crawler_task(media_type=media_type)
-            case "recommend":
-                await sub_task.crawl_toutiao_recommend_task(category_list)
-            case "search":
-                await sub_task.search_candidate_accounts()
-            case _:
-                raise ValueError(f"Unsupported method {method}")
-        return self.TASK_SUCCESS_STATUS
-
-    async def _article_pool_cold_start_handler(self) -> int:
-        cold_start = ArticlePoolColdStart(
-            self.db_client, self.log_client, self.trace_id
-        )
-        platform = self.data.get("platform", "weixin")
-        crawler_methods = self.data.get("crawler_methods", [])
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
-        return self.TASK_SUCCESS_STATUS
-
-    async def _candidate_account_quality_score_handler(self) -> int:
-        task = CandidateAccountQualityScoreRecognizer(
-            self.db_client, self.log_client, self.trace_id
-        )
-        await task.deal()
-        return self.TASK_SUCCESS_STATUS

+ 5 - 5
routes/blueprint.py

@@ -1,5 +1,6 @@
 from quart import Blueprint, jsonify, request
 from applications.ab_test import GetCoverService
+from applications.utils import generate_task_trace_id
 
 from applications.tasks import TaskScheduler
 
@@ -16,15 +17,14 @@ def server_routes(pools, log_service):
 
     @server_blueprint.route("/run_task", methods=["POST"])
     async def run_task():
+        trace_id = generate_task_trace_id()
         data = await request.get_json()
-        print("ss", data)
-        task_scheduler = TaskScheduler(data, log_service, pools)
+        task_scheduler = TaskScheduler(data, log_service, pools, trace_id)
         response = await task_scheduler.deal()
-        print(response)
         return jsonify(response)
 
-    @server_blueprint.route("/finish_task", methods=["GET"])
-    async def finish_task():
+    @server_blueprint.route("/health", methods=["GET"])
+    async def hello_world():
         # data = await request.get_json()
         return jsonify({"message": "hello world"})