Przeglądaj źródła

增加冷启动任务

luojunhui 1 miesiąc temu
rodzic
commit
03aab0e697

+ 3 - 1
applications/tasks/__init__.py

@@ -1 +1,3 @@
-from .task_scheduler import TaskScheduler
+# from .task_scheduler import TaskScheduler
+from .task_scheduler_v2 import TaskScheduler
+

+ 96 - 11
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -2,7 +2,9 @@ from __future__ import annotations
 
 import json
 import time
+import requests
 import traceback
+from datetime import datetime
 from typing import List, Dict
 
 from tqdm import tqdm
@@ -10,6 +12,7 @@ from tqdm import tqdm
 from applications.api import feishu_robot
 from applications.crawler.toutiao import get_toutiao_account_info_list
 from applications.pipeline import CrawlerPipeline
+from applications.utils import proxy
 
 
 class CrawlerToutiaoConst:
@@ -37,8 +40,31 @@ class CrawlerToutiaoConst:
 
 
 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
-    def __init__(self, pool, log_client):
+    def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client)
+        self.trace_id = trace_id
+
+    async def get_request_params(self, category):
+        """
+        get request params
+        """
+        query = f"""
+            select request_method, request_url, request_headers, post_data
+            from toutiao_request_params
+            where category = %s and expire_flag = %s 
+            order by id desc limit 1;
+        """
+        response = await self.pool.async_fetch(query=query, params=(category, 0))
+        if not response:
+            now = datetime.now()
+            if 10 < now.hour < 21:
+                await feishu_robot.bot(
+                    title="今日头条推荐流,cookie 过期",
+                    detail={"info": "cookie expired"},
+                )
+            return None
+        else:
+            return response[0]
 
     async def get_account_list(self, media_type: str) -> List[dict]:
         """get toutiao account list"""
@@ -85,14 +111,12 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             key="toutiao_blogger_cookie", output_type="string"
         )
         while has_more:
-            print(account_id, max_cursor)
             response = await get_toutiao_account_info_list(
                 account_id=account_id,
                 cookie=cookie,
                 media_type=media_type,
                 max_behot_time=current_cursor,
             )
-            print(response)
             if not response:
                 break
 
@@ -128,7 +152,9 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                         case "video":
                             await self.crawler_each_video(info)
                         case "article":
-                            await self.crawler_each_article(info)
+                            await self.crawler_each_article(
+                                method="account", article_raw_data=info
+                            )
                         case _:
                             raise Exception(f"unknown media type: {media_type}")
 
@@ -140,23 +166,42 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             else:
                 break
 
-    async def crawler_each_article(self, article_raw_data):
+    async def crawler_each_article(self, method, article_raw_data, category=None):
         """
         crawler each article
         """
-        new_article_item = {
+        # 公共字段提取
+        base_item = {
             "platform": self.PLATFORM,
-            "mode": "account",
-            "category": "toutiao_account_association",
+            "mode": method,
             "out_account_id": article_raw_data["user_info"]["user_id"],
             "title": article_raw_data["title"],
-            "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
             "read_cnt": article_raw_data["read_count"],
             "like_cnt": article_raw_data["like_count"],
-            "description": article_raw_data["abstract"],
             "publish_time": article_raw_data["publish_time"],
-            "unique_index": article_raw_data["group_id"],
+            "crawler_time": int(time.time()),
         }
+        match method:
+            case "account":
+                new_article_item = {
+                    **base_item,
+                    "category": "toutiao_account_association",
+                    "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
+                    "description": article_raw_data["abstract"],
+                    "unique_index": article_raw_data["group_id"],
+                }
+            case "recommend":
+                new_article_item = {
+                    **base_item,
+                    "category": category,
+                    "title": article_raw_data["title"],
+                    "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
+                    "description": article_raw_data["Abstract"],
+                    "unique_index": article_raw_data["item_id"],
+                }
+            case _:
+                raise Exception(f"unknown method: {method}")
+
         await self.save_item_to_database(media_type="article", item=new_article_item)
 
     async def crawler_each_video(self, video_raw_data):
@@ -230,3 +275,43 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                         },
                     }
                 )
+
+    async def crawler_recommend_articles(self, category: str) -> None:
+        cookie = await self.get_request_params(category=category)
+        if not cookie:
+            return
+
+        for crawler_time in range(10):
+            response = requests.request(
+                method=cookie["request_method"],
+                url=cookie["request_url"],
+                headers=json.loads(cookie["request_headers"]),
+                proxies=proxy(),
+            )
+            if response.text is None:
+                continue
+            article_list = response.json()["data"]
+            for article in article_list:
+                if article.get("article_url"):
+                    video_flag = article.get("has_video")
+                    if not video_flag:
+                        try:
+                            await self.crawler_each_article(
+                                method="recommend",
+                                article_raw_data=article,
+                                category=category,
+                            )
+                        except Exception as e:
+                            print(f"crawler_recommend_articles error: {e}")
+                    else:
+                        print("this is an video rather than article")
+                        continue
+                else:
+                    continue
+
+    async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
+        if not category_list:
+            category_list = ["finance", "tech", "history", "entertainment"]
+
+        for category in category_list:
+            await self.crawler_recommend_articles(category=category)

+ 57 - 2
applications/tasks/monitor_tasks/get_off_videos.py

@@ -32,9 +32,10 @@ class GetOffVideosConst:
 
 
 class GetOffVideos(GetOffVideosConst):
-    def __init__(self, db_client, log_client):
+    def __init__(self, db_client, log_client, trace_id):
         self.db_client = db_client
         self.log_client = log_client
+        self.trace_id = trace_id
 
     async def get_task_list(
         self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
@@ -76,6 +77,14 @@ class GetOffVideos(GetOffVideosConst):
         task_list = await self.get_task_list(
             earliest_timestamp_threshold, expire_timestamp_threshold
         )
+        await self.log_client.log(
+            contents={
+                "task": "get_off_videos",
+                "trace_id": self.trace_id,
+                "message": f"获取{len(task_list)}条待下架视频",
+                "data": task_list,
+            }
+        )
         success_count = 0
         failed_count = 0
         for task in tqdm(task_list):
@@ -90,6 +99,7 @@ class GetOffVideos(GetOffVideosConst):
                         "function": "get_off_job",
                         "status": "fail",
                         "message": "get off video fail",
+                        "trace_id": self.trace_id,
                         "data": {
                             "video_id": video_id,
                             "error": str(e),
@@ -125,11 +135,25 @@ class GetOffVideos(GetOffVideosConst):
         else:
             return self.TASK_SUCCESS_STATUS
 
+    async def deal(self):
+        await self.get_off_job()
+        task_status = await self.check()
+        await self.log_client.log(
+            contents={
+                "task": "get_off_videos",
+                "function": "deal",
+                "trace_id": self.trace_id,
+                "message": "任务执行完成"
+            }
+        )
+        return task_status
+
 
 class CheckVideoAuditStatus(GetOffVideosConst):
-    def __init__(self, db_client, log_client):
+    def __init__(self, db_client, log_client, trace_id):
         self.db_client = db_client
         self.log_client = log_client
+        self.trace_id = trace_id
 
     async def get_video_list_status(self, video_list: List[int]):
         response = await fetch_piaoquan_video_list_detail(video_list)
@@ -170,20 +194,51 @@ class CheckVideoAuditStatus(GetOffVideosConst):
                 yield arr[i : i + chunk_size]
 
         video_id_list = await self.get_unchecked_video_list()
+        if video_id_list:
+            await self.log_client.log(
+                contents={
+                    "task": "check_video_audit_status",
+                    "function": "deal",
+                    "trace_id": self.trace_id,
+                    "message": f"一共获取{len(video_id_list)}条视频",
+                }
+            )
+        else:
+            return self.TASK_SUCCESS_STATUS
+
         video_chunks = chuck_iterator(video_id_list, 10)
 
         bad_videos_count = 0
         fail_list = []
+        batch = 0
         for video_chunk in video_chunks:
+            batch += 1
             bad_video_id_list = await self.get_video_list_status(video_chunk)
             if bad_video_id_list:
                 bad_videos_count += len(bad_video_id_list)
+                await self.log_client.log(
+                    contents={
+                        "task": "check_video_audit_status",
+                        "function": "deal",
+                        "trace_id": self.trace_id,
+                        "message": f"batch: {batch} has {len(bad_video_id_list)} bad videos",
+                        "data": bad_video_id_list,
+                    }
+                )
                 for bad_video_id in tqdm(bad_video_id_list):
                     response = await change_video_audit_status(bad_video_id)
                     if not response:
                         fail_list.append(bad_video_id)
 
             await self.update_check_status(video_chunk)
+            await self.log_client.log(
+                contents={
+                    "task": "check_video_audit_status",
+                    "function": "deal",
+                    "trace_id": self.trace_id,
+                    "message": f"finish process batch: {batch}",
+                }
+            )
 
         if fail_list:
             await feishu_robot.bot(

+ 9 - 5
applications/tasks/task_scheduler.py

@@ -299,12 +299,16 @@ class TaskScheduler(TaskMapper):
                 )
 
             case "crawler_toutiao_articles":
-
                 async def background_crawler_toutiao_articles():
-                    sub_task = CrawlerToutiao(self.db_client, self.log_client)
-                    await sub_task.crawler_task(
-                        media_type=self.data.get("media_type", "article")
-                    )
+                    sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+                    media_type = self.data.get("media_type", "article")
+                    method = self.data.get("method", "account")
+                    category_list = self.data.get("category_list", [])
+                    match method:
+                        case "account":
+                            await sub_task.crawler_task(media_type=media_type)
+                        case "recommend":
+                            await sub_task.crawl_toutiao_recommend_task(category_list=category_list)
                     await self.release_task(
                         task_name=task_name, date_string=date_string
                     )

+ 244 - 0
applications/tasks/task_scheduler_v2.py

@@ -0,0 +1,244 @@
+import asyncio
+import time
+from datetime import datetime
+from typing import Awaitable, Callable, Dict
+
+from applications.api import feishu_robot
+from applications.utils import task_schedule_response, generate_task_trace_id
+
+from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+from applications.tasks.crawler_tasks import CrawlerToutiao
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.monitor_tasks import check_kimi_balance
+from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.task_mapper import TaskMapper
+
+
+class TaskScheduler(TaskMapper):
+    """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
+
+    # ---------- 初始化 ----------
+    def __init__(self, data, log_service, db_client):
+        self.data = data
+        self.log_client = log_service
+        self.db_client = db_client
+        self.table = "long_articles_task_manager"
+        self.trace_id = generate_task_trace_id()
+
+    # ---------- 公共数据库工具 ----------
+    async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
+        """新建记录(若同键已存在则忽略)"""
+        query = (
+            f"insert ignore into {self.table} "
+            "(date_string, task_name, start_timestamp, task_status, trace_id) "
+            "values (%s, %s, %s, %s, %s);"
+        )
+        await self.db_client.async_save(
+            query=query,
+            params=(
+                date_str,
+                task_name,
+                int(time.time()),
+                self.TASK_INIT_STATUS,
+                self.trace_id,
+            ),
+        )
+
+    async def _try_lock_task(self, task_name: str, date_str: str) -> bool:
+        """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
+        query = (
+            f"update {self.table} "
+            "set task_status = %s "
+            "where task_name = %s and date_string = %s and task_status = %s;"
+        )
+        res = await self.db_client.async_save(
+            query=query,
+            params=(
+                self.TASK_PROCESSING_STATUS,
+                task_name,
+                date_str,
+                self.TASK_INIT_STATUS,
+            ),
+        )
+        return True if res else False
+
+    async def _release_task(self, task_name: str, date_str: str, status: int) -> None:
+        query = (
+            f"update {self.table} set task_status=%s, finish_timestamp=%s "
+            "where task_name=%s and date_string=%s and task_status=%s;"
+        )
+        await self.db_client.async_save(
+            query=query,
+            params=(
+                status,
+                int(time.time()),
+                task_name,
+                date_str,
+                self.TASK_PROCESSING_STATUS,
+            ),
+        )
+
+    async def _is_processing_overtime(self, task_name: str) -> bool:
+        """检测是否已有同名任务在执行且超时。若超时会发飞书告警"""
+        query = f"select start_timestamp from {self.table} where task_name=%s and task_status=%s"
+        rows = await self.db_client.async_fetch(
+            query=query, params=(task_name, self.TASK_PROCESSING_STATUS)
+        )
+        if not rows:
+            return False
+        start_ts = rows[0]["start_timestamp"]
+        if int(time.time()) - start_ts >= self.get_task_config(task_name).get(
+            "expire_duration", self.DEFAULT_TIMEOUT
+        ):
+            await feishu_robot.bot(
+                title=f"{task_name} is overtime",
+                detail={"start_ts": start_ts},
+            )
+        return True
+
+    async def _run_with_guard(
+        self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
+    ):
+        """公共:检查、建记录、抢锁、后台运行"""
+        # 1. 超时检测(若有正在执行的同名任务则拒绝)
+        if await self._is_processing_overtime(task_name):
+            return await task_schedule_response.fail_response(
+                "5001", "task is processing"
+            )
+
+        # 2. 记录并尝试抢锁
+        await self._insert_or_ignore_task(task_name, date_str)
+        if not await self._try_lock_task(task_name, date_str):
+            return await task_schedule_response.fail_response(
+                "5001", "task is processing"
+            )
+
+        # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
+        async def _wrapper():
+            status = self.TASK_FAILED_STATUS
+            try:
+                status = (
+                    await task_coro()
+                )  # 你的任务函数需返回 TASK_SUCCESS_STATUS / FAILED_STATUS
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "trace_id": self.trace_id,
+                        "task": task_name,
+                        "err": str(e),
+                    }
+                )
+                await feishu_robot.bot(
+                    title=f"{task_name} is failed",
+                    detail={"task": task_name, "err": str(e)},
+                )
+            finally:
+                await self._release_task(task_name, date_str, status)
+
+        asyncio.create_task(_wrapper(), name=task_name)
+        return await task_schedule_response.success_response(
+            task_name=task_name, data={"code": 0, "message": "task started"}
+        )
+
+    # ---------- 主入口 ----------
+    async def deal(self):
+        task_name: str | None = self.data.get("task_name")
+        if not task_name:
+            return await task_schedule_response.fail_response(
+                "4002", "task_name must be input"
+            )
+
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+
+        # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
+        handlers: Dict[str, Callable[[], Awaitable[int]]] = {
+            "check_kimi_balance": lambda: check_kimi_balance(),
+            "get_off_videos": self._get_off_videos_task,
+            "check_publish_video_audit_status": self._check_video_audit_status,
+            "task_processing_monitor": self._task_processing_monitor,
+            "outside_article_monitor": self._outside_monitor_handler,
+            "inner_article_monitor": self._inner_gzh_articles_monitor,
+            "title_rewrite": self._title_rewrite,
+            "daily_publish_articles_recycle": self._recycle_handler,
+            "update_root_source_id": self._update_root_source_id,
+            "crawler_toutiao_articles": self._crawler_toutiao_handler,
+            "article_pool_pool_cold_start": self._article_pool_cold_start_handler,
+        }
+
+        if task_name not in handlers:
+            return await task_schedule_response.fail_response(
+                "4001", "wrong task name input"
+            )
+        return await self._run_with_guard(task_name, date_str, handlers[task_name])
+
+    # ---------- 下面是若干复合任务的局部实现 ----------
+    # 写成独立方法保持清爽
+    async def _get_off_videos_task(self):
+        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _check_video_audit_status(self):
+        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _task_processing_monitor(self):
+        sub_task = TaskProcessingMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _inner_gzh_articles_monitor(self):
+        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _title_rewrite(self):
+        sub_task = TitleRewrite(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _update_root_source_id(self) -> int:
+        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _outside_monitor_handler(self) -> int:
+        collector = OutsideGzhArticlesCollector(self.db_client)
+        await collector.deal()
+        monitor = OutsideGzhArticlesMonitor(self.db_client)
+        return await monitor.deal()  # 应返回 SUCCESS / FAILED
+
+    async def _recycle_handler(self) -> int:
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+        recycle = RecycleDailyPublishArticlesTask(
+            self.db_client, self.log_client, date_str
+        )
+        await recycle.deal()
+        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
+        return await check.deal()
+
+    async def _crawler_toutiao_handler(self) -> int:
+        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        media_type = self.data.get("media_type", "article")
+        method = self.data.get("method", "account")
+        category_list = self.data.get("category_list", [])
+
+        if method == "account":
+            await sub_task.crawler_task(media_type=media_type)
+        elif method == "recommend":
+            await sub_task.crawl_toutiao_recommend_task(category_list)
+        else:
+            raise ValueError(f"Unsupported method {method}")
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_cold_start_handler(self) -> int:
+        cold_start = ArticlePoolColdStart(
+            self.db_client, self.log_client, self.trace_id
+        )
+        platform = self.data.get("platform", "weixin")
+        crawler_methods = self.data.get("crawler_methods", [])
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        return self.TASK_SUCCESS_STATUS

+ 1 - 1
applications/utils/item.py

@@ -26,7 +26,7 @@ class CrawlerMetaArticle(BaseModel):
         default=None, max_length=255, description="文章简介"
     )
     publish_time: int = Field(default=None, description="文章发布时间")
-    crawler_time: int = Field(default=int(time.time()), description="抓取时间")
+    crawler_time: int = Field(default=None, description="抓取时间")
     score: float = Field(default=None, description="相似度分")
     status: int = Field(default=1, description="文章状态")
     unique_index: str = Field(default=..., description="文章唯一index")

+ 2 - 0
routes/blueprint.py

@@ -17,8 +17,10 @@ def server_routes(pools, log_service):
     @server_blueprint.route("/run_task", methods=["POST"])
     async def run_task():
         data = await request.get_json()
+        print("ss", data)
         task_scheduler = TaskScheduler(data, log_service, pools)
         response = await task_scheduler.deal()
+        print(response)
         return jsonify(response)
 
     @server_blueprint.route("/finish_task", methods=["POST"])

+ 11 - 0
task_app.py

@@ -1,6 +1,8 @@
+import asyncio
 import logging
 
 from quart import Quart
+from aiomonitor import start_monitor
 from applications.config import aliyun_log_config
 from applications.database import mysql_manager
 from applications.service import LogService
@@ -14,9 +16,15 @@ app.register_blueprint(routes)
 
 logging.basicConfig(level=logging.INFO)
 
+_monitor = None
+
 
 @app.before_serving
 async def startup():
+    global _monitor
+    loop = asyncio.get_event_loop()
+    _monitor = start_monitor(loop=loop, host="127.0.0.1", port=50101)
+    logging.info(f"Monitor started at {_monitor}")
     logging.info("Starting application...")
     await mysql_manager.init_pools()
     logging.info("Mysql pools init successfully")
@@ -31,3 +39,6 @@ async def shutdown():
     logging.info("Mysql pools close successfully")
     await log_service.stop()
     logging.info("aliyun log service stop successfully")
+    if _monitor:
+        _monitor.close()
+        logging.info("Monitor stopped successfully")