Browse Source

增加冷启动任务

luojunhui 1 month ago
parent
commit
4f819b7c6d

+ 17 - 10
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -92,7 +92,7 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 "function": "get_account_list",
                 "message": f"get toutiao account list, media_type: {media_type}",
                 "status": "success",
-                "data": response
+                "data": response,
             }
         )
         if not response:
@@ -283,12 +283,12 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 )
                 await self.log_client.log(
                     contents={
-                    "trace_id": self.trace_id,
-                    "task": "crawler_toutiao_account_info",
-                    "function": "crawler_task",
-                    "message": f"crawler account: {account_id} successfully, media type: {media_type}",
-                    "status": "success"
-                }
+                        "trace_id": self.trace_id,
+                        "task": "crawler_toutiao_account_info",
+                        "function": "crawler_task",
+                        "message": f"crawler account: {account_id} successfully, media type: {media_type}",
+                        "status": "success",
+                    }
                 )
 
             except Exception as e:
@@ -315,14 +315,16 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
         for crawler_time in range(self.RECOMMEND_TIMES):
             try:
                 proxy_url = async_proxy()["url"]
-                proxy_auth = aiohttp.BasicAuth(async_proxy()["username"], async_proxy()["password"])
+                proxy_auth = aiohttp.BasicAuth(
+                    async_proxy()["username"], async_proxy()["password"]
+                )
                 async with aiohttp.ClientSession() as session:
                     async with session.request(
                         method=cookie["request_method"],
                         url=cookie["request_url"],
                         headers=json.loads(cookie["request_headers"]),
                         proxy=proxy_url,
-                        proxy_auth=proxy_auth
+                        proxy_auth=proxy_auth,
                     ) as response:
                         response.raise_for_status()
                         response_json = await response.json()
@@ -345,11 +347,16 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                         "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
                         "status": "fail",
                         "trace_id": self.trace_id,
-                        "data": {"error": str(e), "traceback": traceback.format_exc(),},
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
                     }
                 )
                 continue
 
+            if not response_json:
+                continue
             article_list = response_json["data"]
             for article in article_list:
                 if article.get("article_url"):

+ 1 - 1
applications/tasks/monitor_tasks/get_off_videos.py

@@ -143,7 +143,7 @@ class GetOffVideos(GetOffVideosConst):
                 "task": "get_off_videos",
                 "function": "deal",
                 "trace_id": self.trace_id,
-                "message": "任务执行完成"
+                "message": "任务执行完成",
             }
         )
         return task_status

+ 7 - 2
applications/tasks/task_scheduler.py

@@ -299,8 +299,11 @@ class TaskScheduler(TaskMapper):
                 )
 
             case "crawler_toutiao_articles":
+
                 async def background_crawler_toutiao_articles():
-                    sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+                    sub_task = CrawlerToutiao(
+                        self.db_client, self.log_client, self.trace_id
+                    )
                     media_type = self.data.get("media_type", "article")
                     method = self.data.get("method", "account")
                     category_list = self.data.get("category_list", [])
@@ -308,7 +311,9 @@ class TaskScheduler(TaskMapper):
                         case "account":
                             await sub_task.crawler_task(media_type=media_type)
                         case "recommend":
-                            await sub_task.crawl_toutiao_recommend_task(category_list=category_list)
+                            await sub_task.crawl_toutiao_recommend_task(
+                                category_list=category_list
+                            )
                     await self.release_task(
                         task_name=task_name, date_string=date_string
                     )

+ 49 - 25
applications/tasks/task_scheduler_v2.py

@@ -1,5 +1,6 @@
 import asyncio
 import time
+import traceback
 from datetime import datetime
 from typing import Awaitable, Callable, Dict
 
@@ -125,27 +126,26 @@ class TaskScheduler(TaskMapper):
         async def _wrapper():
             status = self.TASK_FAILED_STATUS
             try:
-                status = (
-                    await task_coro()
-                )  # 你的任务函数需返回 TASK_SUCCESS_STATUS / FAILED_STATUS
+                status = await task_coro()
             except Exception as e:
                 await self.log_client.log(
                     contents={
                         "trace_id": self.trace_id,
+                        "function": "cor_wrapper",
                         "task": task_name,
-                        "err": str(e),
+                        "error": str(e),
                     }
                 )
                 await feishu_robot.bot(
                     title=f"{task_name} is failed",
-                    detail={"task": task_name, "err": str(e)},
+                    detail={"task": task_name, "err": str(e), "traceback": traceback.format_exc()},
                 )
             finally:
                 await self._release_task(task_name, date_str, status)
 
         asyncio.create_task(_wrapper(), name=task_name)
         return await task_schedule_response.success_response(
-            task_name=task_name, data={"code": 0, "message": "task started"}
+            task_name=task_name, data={"code": 0, "message": "task started", "trace_id": self.trace_id}
         )
 
     # ---------- 主入口 ----------
@@ -160,17 +160,28 @@ class TaskScheduler(TaskMapper):
 
         # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
         handlers: Dict[str, Callable[[], Awaitable[int]]] = {
-            "check_kimi_balance": lambda: check_kimi_balance(),
-            "get_off_videos": self._get_off_videos_task,
-            "check_publish_video_audit_status": self._check_video_audit_status,
-            "task_processing_monitor": self._task_processing_monitor,
+            # 校验kimi余额
+            "check_kimi_balance": self._check_kimi_balance_handler,
+            # 长文视频发布之后,三天后下架
+            "get_off_videos": self._get_off_videos_task_handler,
+            # 长文视频发布之后,三天内保持视频可见状态
+            "check_publish_video_audit_status": self._check_video_audit_status_handler,
+            # 外部服务号发文监测
             "outside_article_monitor": self._outside_monitor_handler,
-            "inner_article_monitor": self._inner_gzh_articles_monitor,
-            "title_rewrite": self._title_rewrite,
-            "daily_publish_articles_recycle": self._recycle_handler,
-            "update_root_source_id": self._update_root_source_id,
+            # 站内发文监测
+            "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
+            # 标题重写(代测试)
+            "title_rewrite": self._title_rewrite_handler,
+            # 每日发文数据回收
+            "daily_publish_articles_recycle": self._recycle_article_data_handler,
+            # 每日发文更新root_source_id
+            "update_root_source_id": self._update_root_source_id_handler,
+            # 头条文章,视频抓取
             "crawler_toutiao_articles": self._crawler_toutiao_handler,
+            # 文章池冷启动发布
             "article_pool_pool_cold_start": self._article_pool_cold_start_handler,
+            # 任务超时监控
+            "task_processing_monitor": self._task_processing_monitor_handler,
         }
 
         if task_name not in handlers:
@@ -180,30 +191,42 @@ class TaskScheduler(TaskMapper):
         return await self._run_with_guard(task_name, date_str, handlers[task_name])
 
     # ---------- 下面是若干复合任务的局部实现 ----------
-    # 写成独立方法保持清爽
-    async def _get_off_videos_task(self):
+    async def _check_kimi_balance_handler(self) -> int:
+        response = await check_kimi_balance()
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "check_kimi_balance",
+                "data": response,
+            }
+        )
+        return self.TASK_SUCCESS_STATUS
+
+    async def _get_off_videos_task_handler(self) -> int:
         sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
         return await sub_task.deal()
 
-    async def _check_video_audit_status(self):
+    async def _check_video_audit_status_handler(self) -> int:
         sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
         return await sub_task.deal()
 
-    async def _task_processing_monitor(self):
+    async def _task_processing_monitor_handler(self) -> int:
         sub_task = TaskProcessingMonitor(self.db_client)
-        return await sub_task.deal()
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
 
-    async def _inner_gzh_articles_monitor(self):
+    async def _inner_gzh_articles_monitor_handler(self) -> int:
         sub_task = InnerGzhArticlesMonitor(self.db_client)
         return await sub_task.deal()
 
-    async def _title_rewrite(self):
+    async def _title_rewrite_handler(self):
         sub_task = TitleRewrite(self.db_client, self.log_client)
         return await sub_task.deal()
 
-    async def _update_root_source_id(self) -> int:
+    async def _update_root_source_id_handler(self) -> int:
         sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
-        return await sub_task.deal()
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
 
     async def _outside_monitor_handler(self) -> int:
         collector = OutsideGzhArticlesCollector(self.db_client)
@@ -211,14 +234,15 @@ class TaskScheduler(TaskMapper):
         monitor = OutsideGzhArticlesMonitor(self.db_client)
         return await monitor.deal()  # 应返回 SUCCESS / FAILED
 
-    async def _recycle_handler(self) -> int:
+    async def _recycle_article_data_handler(self) -> int:
         date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
         recycle = RecycleDailyPublishArticlesTask(
             self.db_client, self.log_client, date_str
         )
         await recycle.deal()
         check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
-        return await check.deal()
+        await check.deal()
+        return self.TASK_SUCCESS_STATUS
 
     async def _crawler_toutiao_handler(self) -> int:
         sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)