Quellcode durchsuchen

限流、违规报警优化

luojunhui vor 1 Woche
Ursprung
Commit
c7b2404338

+ 4 - 1
.gitignore

@@ -63,4 +63,7 @@ docs/_build/
 target/
 
 .claude
-.cursor
+.cursor
+.env
+.env.local
+.env.*.local

+ 60 - 13
app/api/service/task_scheduler.py

@@ -1,10 +1,10 @@
 import asyncio
 import json
+import logging
 import time
 from datetime import datetime, timedelta
 from typing import Optional, Dict, Any, List
 
-from app.infra.external import feishu_robot
 from app.infra.shared import task_schedule_response
 from app.jobs.task_handler import TaskHandler
 from app.jobs.task_config import (
@@ -18,9 +18,12 @@ from app.jobs.task_utils import (
     TaskConcurrencyError,
     TaskUtils,
 )
+from app.jobs.task_lifecycle import TaskLifecycleManager
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
-from app.core.observability import LogService
+from app.core.observability import LogService, AlertService
+
+logger = logging.getLogger(__name__)
 
 
 class TaskScheduler(TaskHandler):
@@ -43,6 +46,12 @@ class TaskScheduler(TaskHandler):
         super().__init__(data, log_service, db_client, trace_id, config)
         self.table = TaskUtils.validate_table_name(TaskConstants.TASK_TABLE)
 
+    async def _send_alert(self, title: str, detail: dict, dedup_key: str = None):
+        """发送告警(异步解耦,不阻塞主链路)"""
+        alert = AlertService.get_instance()
+        if alert:
+            await alert.send_alert(title=title, detail=detail, dedup_key=dedup_key)
+
     # ==================== 数据库操作 ====================
 
     async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
@@ -85,7 +94,7 @@ class TaskScheduler(TaskHandler):
         query = f"""
             UPDATE {self.table}
             SET task_status = %s, finish_timestamp = %s
-            WHERE trace_id = %s AND task_status = %s
+            WHERE trace_id = %s AND task_status IN (%s, %s)
         """
         await self.db_client.async_save(
             query=query,
@@ -94,6 +103,7 @@ class TaskScheduler(TaskHandler):
                 int(time.time()),
                 self.trace_id,
                 TaskStatus.PROCESSING,
+                TaskStatus.CANCEL_REQUESTED,
             ),
         )
 
@@ -148,7 +158,7 @@ class TaskScheduler(TaskHandler):
                 timeout_tasks=[t["trace_id"] for t in timeout_tasks],
             )
 
-            await feishu_robot.bot(
+            await self._send_alert(
                 title=f"Task Timeout Alert: {task_name}",
                 detail={
                     "task_name": task_name,
@@ -162,6 +172,7 @@ class TaskScheduler(TaskHandler):
                         for t in timeout_tasks
                     ],
                 },
+                dedup_key=f"timeout_{task_name}",
             )
 
             # 可选:自动释放超时任务(需要谨慎使用)
@@ -183,7 +194,7 @@ class TaskScheduler(TaskHandler):
                 max_concurrent=config.max_concurrent,
             )
 
-            await feishu_robot.bot(
+            await self._send_alert(
                 title=f"Task Concurrency Limit: {task_name}",
                 detail={
                     "task_name": task_name,
@@ -191,6 +202,7 @@ class TaskScheduler(TaskHandler):
                     "max_concurrent": config.max_concurrent,
                     "active_tasks": [t["trace_id"] for t in active_tasks],
                 },
+                dedup_key=f"concurrency_{task_name}",
             )
 
             raise TaskConcurrencyError(
@@ -266,7 +278,7 @@ class TaskScheduler(TaskHandler):
 
                 # 根据错误类型决定是否告警
                 if config.alert_on_failure:
-                    await feishu_robot.bot(
+                    await self._send_alert(
                         title=f"Task Failed: {task_name}",
                         detail={
                             "task_name": task_name,
@@ -275,12 +287,24 @@ class TaskScheduler(TaskHandler):
                             "duration": duration,
                             "retryable": e.retryable,
                         },
+                        dedup_key=f"task_failed_{task_name}_{self.trace_id}",
                     )
 
                 # TODO: 实现重试逻辑
                 # if e.retryable and retry_count < config.retry_times:
                 #     await self._schedule_retry(task_name, retry_count + 1)
 
+            except asyncio.CancelledError:
+                # 任务被取消
+                status = TaskStatus.CANCELLED
+                duration = time.time() - start_time
+                await self._log_task_event(
+                    "task_cancelled",
+                    task_name=task_name,
+                    duration=duration,
+                )
+                raise
+
             except Exception as e:
                 # 未知错误
                 duration = time.time() - start_time
@@ -293,7 +317,7 @@ class TaskScheduler(TaskHandler):
                     duration=duration,
                 )
 
-                await feishu_robot.bot(
+                await self._send_alert(
                     title=f"Task Error: {task_name}",
                     detail={
                         "task_name": task_name,
@@ -301,13 +325,22 @@ class TaskScheduler(TaskHandler):
                         "error": error_detail,
                         "duration": duration,
                     },
+                    dedup_key=f"task_error_{task_name}_{self.trace_id}",
                 )
 
             finally:
                 await self._release_task(status)
+                lifecycle = TaskLifecycleManager.get_instance()
+                if lifecycle:
+                    await lifecycle.unregister(self.trace_id)
 
         # 创建后台任务
-        asyncio.create_task(_task_wrapper(), name=f"{task_name}_{self.trace_id}")
+        task = asyncio.create_task(
+            _task_wrapper(), name=f"{task_name}_{self.trace_id}"
+        )
+        lifecycle = TaskLifecycleManager.get_instance()
+        if lifecycle:
+            await lifecycle.register(self.trace_id, task)
 
         return await task_schedule_response.success_response(
             task_name=task_name,
@@ -334,12 +367,15 @@ class TaskScheduler(TaskHandler):
         """
         trace_id = trace_id or self.trace_id
         query = f"SELECT * FROM {self.table} WHERE trace_id = %s"
-        result = await self.db_client.async_fetch_one(query, (trace_id,))
+        result = await self.db_client.async_fetch_one(query, params=(trace_id,))
         return result
 
     async def cancel_task(self, trace_id: Optional[str] = None) -> bool:
         """
-        取消任务(将状态设置为失败)
+        取消任务
+
+        INIT 状态直接设为 CANCELLED,PROCESSING 状态设为 CANCEL_REQUESTED
+        等待轮询器检测到信号后取消本地协程
 
         Args:
             trace_id: 任务追踪 ID,默认使用当前实例的 trace_id
@@ -350,13 +386,24 @@ class TaskScheduler(TaskHandler):
         trace_id = trace_id or self.trace_id
         query = f"""
             UPDATE {self.table}
-            SET task_status = %s, finish_timestamp = %s
+            SET task_status = CASE
+                    WHEN task_status = %s THEN %s
+                    WHEN task_status = %s THEN %s
+                END,
+                finish_timestamp = CASE
+                    WHEN task_status = %s THEN %s
+                    ELSE finish_timestamp
+                END
             WHERE trace_id = %s AND task_status IN (%s, %s)
         """
         result = await self.db_client.async_save(
             query,
             (
-                TaskStatus.FAILED,
+                TaskStatus.INIT,
+                TaskStatus.CANCELLED,
+                TaskStatus.PROCESSING,
+                TaskStatus.CANCEL_REQUESTED,
+                TaskStatus.INIT,
                 int(time.time()),
                 trace_id,
                 TaskStatus.INIT,
@@ -365,7 +412,7 @@ class TaskScheduler(TaskHandler):
         )
 
         if result:
-            await self._log_task_event("task_cancelled", trace_id=trace_id)
+            await self._log_task_event("task_cancel_requested", trace_id=trace_id)
 
         return bool(result)
 

+ 26 - 2
app/api/v1/endpoints/tasks.py

@@ -1,11 +1,11 @@
 from __future__ import annotations
 
 from pydantic import ValidationError
-from quart import Blueprint, jsonify
+from quart import Blueprint, jsonify, current_app
 
 from app.api.service import TaskManager, TaskScheduler
 from app.api.v1.utils import ApiDependencies
-from app.api.v1.utils import RunTaskRequest, TaskListRequest
+from app.api.v1.utils import RunTaskRequest, TaskListRequest, CancelTaskRequest
 from app.api.v1.utils import parse_json, validation_error_response
 from app.infra.shared.tools import generate_task_trace_id
 
@@ -15,6 +15,10 @@ def create_tasks_bp(deps: ApiDependencies) -> Blueprint:
 
     @bp.route("/run_task", methods=["POST"])
     async def run_task():
+        # 检查是否接受新任务
+        if not current_app.config.get("ACCEPTING_TASKS", True):
+            return jsonify({"code": 5003, "message": "Server is shutting down"}), 503
+
         trace_id = generate_task_trace_id()
 
         try:
@@ -39,4 +43,24 @@ def create_tasks_bp(deps: ApiDependencies) -> Blueprint:
         result = await manager.list_tasks()
         return jsonify(result)
 
+    @bp.route("/cancel_task", methods=["POST"])
+    async def cancel_task():
+        try:
+            _, body = await parse_json(CancelTaskRequest)
+        except ValidationError as e:
+            payload, status = validation_error_response(e)
+            return jsonify(payload), status
+
+        trace_id = body["trace_id"]
+        scheduler = TaskScheduler(body, deps.log, deps.db, trace_id, deps.config)
+        success = await scheduler.cancel_task(trace_id)
+
+        return jsonify(
+            {
+                "code": 0 if success else 1,
+                "message": "cancel requested" if success else "task not found or already finished",
+                "trace_id": trace_id,
+            }
+        )
+
     return bp

+ 2 - 0
app/api/v1/utils/__init__.py

@@ -3,6 +3,7 @@ from .deps import ApiDependencies
 from .schemas import (
     RunTaskRequest,
     TaskListRequest,
+    CancelTaskRequest,
     SaveTokenRequest,
     GetCoverRequest,
     LongArticlesMcpRequest,
@@ -16,6 +17,7 @@ __all__ = [
     "validation_error_response",
     "RunTaskRequest",
     "TaskListRequest",
+    "CancelTaskRequest",
     "SaveTokenRequest",
     "GetCoverRequest",
     "LongArticlesMcpRequest",

+ 4 - 0
app/api/v1/utils/schemas.py

@@ -28,6 +28,10 @@ class TaskListRequest(BaseRequest):
     task_status: Optional[int] = None
 
 
+class CancelTaskRequest(BaseRequest):
+    trace_id: str = Field(..., min_length=1)
+
+
 class GetCoverRequest(BaseRequest):
     """GetCoverService 的请求体字段不固定,先保持兼容。"""
 

+ 1 - 4
app/core/bootstrap/resource_manager.py

@@ -20,14 +20,11 @@ class AppContext:
         logger.info("aliyun log service init successfully")
 
     async def shutdown(self):
+        """这里只做资源关闭;日志与告警 flush 由 task_app.py 统一编排"""
         logger.info("关闭数据库连接池")
         mysql = self.container.mysql_manager()
         await mysql.close_pools()
         logger.info("应用资源已释放")
-        logger.info("关闭日志服务")
-        log_service = self.container.log_service()
-        await log_service.stop()
-        logger.info("aliyun log service stopped")
 
 
 __all__ = [

+ 1 - 0
app/core/config/global_settings.py

@@ -29,6 +29,7 @@ class GlobalConfigSettings(BaseSettings):
 
     # ============ 外部服务配置 ============
     deepseek: DeepSeekConfig = Field(default_factory=DeepSeekConfig)
+    feishu: FeishuConfig = Field(default_factory=FeishuConfig)
 
     aliyun_log: AliyunLogConfig = Field(default_factory=AliyunLogConfig)
     aliyun_oss: AliyunOssConfig = Field(default_factory=AliyunOssConfig)

+ 2 - 0
app/core/config/settings/__init__.py

@@ -5,6 +5,7 @@ from .category import CategoryConfig
 from .cold_start import ColdStartConfig
 from .deepseek import DeepSeekConfig
 from .elasticsearch import ElasticsearchConfig
+from .feishu import FeishuConfig
 from .mysql import AigcDatabaseConfig
 from .mysql import GrowthDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
@@ -22,6 +23,7 @@ __ALL__ = [
     "ColdStartConfig",
     "DeepSeekConfig",
     "ElasticsearchConfig",
+    "FeishuConfig",
     "AigcDatabaseConfig",
     "GrowthDatabaseConfig",
     "LongArticlesDatabaseConfig",

+ 6 - 4
app/core/config/settings/aliyun.py

@@ -5,8 +5,8 @@ class AliyunLogConfig(BaseSettings):
     """阿里云日志配置"""
 
     endpoint: str = "cn-hangzhou.log.aliyuncs.com"
-    access_key_id: str = "LTAIP6x1l3DXfSxm"
-    access_key_secret: str = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    access_key_id: str = ""
+    access_key_secret: str = ""
     project: str = "changwen-alg"
     logstore: str = "long_articles_job"
 
@@ -25,10 +25,12 @@ class AliyunLogConfig(BaseSettings):
         }
 
 
-class AliyunOssConfig(AliyunLogConfig):
-    """阿里云日志配置"""
+class AliyunOssConfig(BaseSettings):
+    """阿里云 OSS 配置"""
 
     endpoint: str = "oss-cn-hangzhou.aliyuncs.com"
+    access_key_id: str = ""
+    access_key_secret: str = ""
     bucket_name: str = "art-pubbucket"
 
     model_config = SettingsConfigDict(

+ 1 - 3
app/core/config/settings/deepseek.py

@@ -5,9 +5,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
 class DeepSeekConfig(BaseSettings):
     """DeepSeek API 配置"""
 
-    api_key: str = Field(
-        default="sk-cfd2df92c8864ab999d66a615ee812c5", description="DeepSeek API Key"
-    )
+    api_key: str = Field(default="", description="DeepSeek API Key")
     reasoner_model: str = Field(
         default="deepseek-reasoner", description="DeepSeek 推理模型"
     )

Datei-Diff unterdrückt, da er zu groß ist
+ 2 - 7
app/core/config/settings/elasticsearch.py


+ 24 - 0
app/core/config/settings/feishu.py

@@ -0,0 +1,24 @@
+from pydantic_settings import BaseSettings, SettingsConfigDict
+
+
+class FeishuConfig(BaseSettings):
+    """飞书配置"""
+
+    app_id: str = ""
+    app_secret: str = ""
+
+    # Webhook URLs
+    long_articles_bot: str = ""
+    long_articles_bot_dev: str = ""
+    long_articles_task_bot: str = ""
+    outside_gzh_monitor_bot: str = ""
+    server_account_publish_monitor_bot: str = ""
+    cookie_monitor_bot: str = ""
+    rank_monitor_bot: str = ""
+
+    model_config = SettingsConfigDict(
+        env_prefix="FEISHU_",
+        env_file=".env",
+        case_sensitive=False,
+        extra="ignore",
+    )

+ 0 - 25
app/core/config/settings/mysql.py

@@ -34,11 +34,6 @@ class DatabaseConfig(BaseSettings):
 class AigcDatabaseConfig(DatabaseConfig):
     """AIGC 数据库配置"""
 
-    host: str = "rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com"
-    user: str = "crawler_admin"
-    password: str = "cyber#crawler_2023"
-    db: str = "aigc-admin-prod"
-
     model_config = SettingsConfigDict(
         env_prefix="AIGC_DB_", env_file=".env", case_sensitive=False, extra="ignore"
     )
@@ -47,11 +42,6 @@ class AigcDatabaseConfig(DatabaseConfig):
 class LongVideoDatabaseConfig(DatabaseConfig):
     """长视频数据库配置"""
 
-    host: str = "rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com"
-    user: str = "wx2016_longvideo"
-    password: str = "wx2016_longvideoP@assword1234"
-    db: str = "longvideo"
-
     model_config = SettingsConfigDict(
         env_prefix="LONG_VIDEO_DB_",
         env_file=".env",
@@ -63,11 +53,6 @@ class LongVideoDatabaseConfig(DatabaseConfig):
 class LongArticlesDatabaseConfig(DatabaseConfig):
     """长文数据库配置"""
 
-    host: str = "rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com"
-    user: str = "changwen_admin"
-    password: str = "changwen@123456"
-    db: str = "long_articles"
-
     model_config = SettingsConfigDict(
         env_prefix="LONG_ARTICLES_DB_",
         env_file=".env",
@@ -79,11 +64,6 @@ class LongArticlesDatabaseConfig(DatabaseConfig):
 class PiaoquanCrawlerDatabaseConfig(DatabaseConfig):
     """票圈爬虫数据库配置"""
 
-    host: str = "rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com"
-    user: str = "crawler"
-    password: str = "crawler123456@"
-    db: str = "piaoquan-crawler"
-
     model_config = SettingsConfigDict(
         env_prefix="PIAOQUAN_CRAWLER_DB_",
         env_file=".env",
@@ -95,11 +75,6 @@ class PiaoquanCrawlerDatabaseConfig(DatabaseConfig):
 class GrowthDatabaseConfig(DatabaseConfig):
     """增长数据库配置"""
 
-    host: str = "rm-bp17q95335a99272b.mysql.rds.aliyuncs.com"
-    user: str = "crawler"
-    password: str = "crawler123456@"
-    db: str = "growth"
-
     model_config = SettingsConfigDict(
         env_prefix="GROWTH_DB_", env_file=".env", case_sensitive=False, extra="ignore"
     )

+ 55 - 16
app/core/database/mysql_pools.py

@@ -6,13 +6,12 @@ from aiomysql.cursors import DictCursor
 from app.core.config import GlobalConfigSettings
 from app.core.observability import LogService
 
+logger = logging.getLogger(__name__)
 
-logging.basicConfig(level=logging.INFO)
 
-
-class DatabaseManager(LogService):
-    def __init__(self, config: GlobalConfigSettings):
-        super().__init__(config.aliyun_log)
+class DatabaseManager:
+    def __init__(self, config: GlobalConfigSettings, log_service: LogService):
+        self.log_service = log_service
         self.database_mapper = {
             "aigc": config.aigc_db,
             "growth": config.growth_db,
@@ -22,6 +21,9 @@ class DatabaseManager(LogService):
         }
         self.pools = {}
 
+    async def _log(self, contents: dict):
+        await self.log_service.log(contents)
+
     async def init_pools(self):
         # 从配置获取数据库配置,也可以直接在这里配置
         for db_name, config in self.database_mapper.items():
@@ -38,10 +40,10 @@ class DatabaseManager(LogService):
                     autocommit=True,
                 )
                 self.pools[db_name] = pool
-                logging.info(f"{db_name} MYSQL连接池 created successfully")
+                logger.info(f"{db_name} MYSQL连接池 created successfully")
 
             except Exception as e:
-                await self.log(
+                await self._log(
                     contents={
                         "db_name": db_name,
                         "error": str(e),
@@ -55,24 +57,27 @@ class DatabaseManager(LogService):
             if pool:
                 pool.close()
                 await pool.wait_closed()
-                logging.info(f"{name} MYSQL连接池 closed successfully")
+                logger.info(f"{name} MYSQL连接池 closed successfully")
 
     async def async_fetch(
         self, query, db_name="long_articles", params=None, cursor_type=DictCursor
     ):
-        pool = self.pools[db_name]
+        pool = self.pools.get(db_name)
         if not pool:
             await self.init_pools()
-        # fetch from db
+            pool = self.pools.get(db_name)
+
+        if not pool:
+            raise RuntimeError(f"Database pool '{db_name}' not available after init")
+
         try:
             async with pool.acquire() as conn:
                 async with conn.cursor(cursor_type) as cursor:
                     await cursor.execute(query, params)
                     fetch_response = await cursor.fetchall()
-
             return fetch_response
         except Exception as e:
-            await self.log(
+            await self._log(
                 contents={
                     "task": "async_fetch",
                     "db_name": db_name,
@@ -82,14 +87,48 @@ class DatabaseManager(LogService):
                     "params": params,
                 }
             )
-            return None
+            raise
+
+    async def async_fetch_one(
+        self, query, db_name="long_articles", params=None, cursor_type=DictCursor
+    ):
+        """查询单条记录,不存在返回 None,出错抛异常"""
+        pool = self.pools.get(db_name)
+        if not pool:
+            await self.init_pools()
+            pool = self.pools.get(db_name)
+
+        if not pool:
+            raise RuntimeError(f"Database pool '{db_name}' not available after init")
+
+        try:
+            async with pool.acquire() as conn:
+                async with conn.cursor(cursor_type) as cursor:
+                    await cursor.execute(query, params)
+                    return await cursor.fetchone()
+        except Exception as e:
+            await self._log(
+                contents={
+                    "task": "async_fetch_one",
+                    "db_name": db_name,
+                    "error": str(e),
+                    "message": f"Failed to fetch one from {db_name}",
+                    "query": query,
+                    "params": params,
+                }
+            )
+            raise
 
     async def async_save(
         self, query, params, db_name="long_articles", batch: bool = False
     ):
-        pool = self.pools[db_name]
+        pool = self.pools.get(db_name)
         if not pool:
             await self.init_pools()
+            pool = self.pools.get(db_name)
+
+        if not pool:
+            raise RuntimeError(f"Database pool '{db_name}' not available after init")
 
         async with pool.acquire() as connection:
             async with connection.cursor() as cursor:
@@ -103,7 +142,7 @@ class DatabaseManager(LogService):
                     return affected_rows
                 except Exception as e:
                     await connection.rollback()
-                    await self.log(
+                    await self._log(
                         contents={
                             "task": "async_save",
                             "db_name": db_name,
@@ -113,7 +152,7 @@ class DatabaseManager(LogService):
                             "params": params,
                         }
                     )
-                    raise e
+                    raise
 
     def get_pool(self, db_name):
         return self.pools.get(db_name)

+ 4 - 2
app/core/dependency/dependencies.py

@@ -12,8 +12,10 @@ class ServerContainer(containers.DeclarativeContainer):
     # 阿里云日志
     log_service = providers.Singleton(LogService, log_config=config.provided.aliyun_log)
 
-    # MySQL
-    mysql_manager = providers.Singleton(DatabaseManager, config=config)
+    # MySQL(组合模式,注入 log_service)
+    mysql_manager = providers.Singleton(
+        DatabaseManager, config=config, log_service=log_service
+    )
 
 
 __all__ = [

+ 1 - 0
app/core/observability/__init__.py

@@ -1 +1,2 @@
 from .logging import LogService
+from .alert_service import AlertService

+ 134 - 0
app/core/observability/alert_service.py

@@ -0,0 +1,134 @@
+import asyncio
+import contextlib
+import logging
+import time
+from collections import deque
+from typing import Any, Dict, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class AlertService:
+    """异步告警服务,避免告警发送阻塞主任务链路"""
+
+    _instance: Optional["AlertService"] = None
+
+    def __init__(self, feishu_client, max_queue_size: int = 1000):
+        self.feishu = feishu_client
+        self.queue: Optional[asyncio.Queue] = None
+        self._worker_task: Optional[asyncio.Task] = None
+        self._running = False
+        self._max_queue_size = max_queue_size
+        self._recent_alerts = deque(maxlen=200)
+        self._dropped_count = 0
+
+    @classmethod
+    def initialize(cls, feishu_client, max_queue_size: int = 1000) -> "AlertService":
+        if cls._instance is None:
+            cls._instance = cls(feishu_client, max_queue_size=max_queue_size)
+        return cls._instance
+
+    @classmethod
+    def get_instance(cls) -> Optional["AlertService"]:
+        return cls._instance
+
+    async def start(self):
+        if self._running:
+            return
+
+        self.queue = asyncio.Queue(maxsize=self._max_queue_size)
+        self._running = True
+        self._worker_task = asyncio.create_task(self._worker(), name="alert_service")
+        logger.info("AlertService started")
+
+    async def stop(self, drain_timeout: float = 5.0):
+        if not self._running:
+            return
+
+        self._running = False
+
+        if self.queue and self.queue.qsize() > 0:
+            logger.info(f"AlertService draining {self.queue.qsize()} alerts...")
+            try:
+                await asyncio.wait_for(self.queue.join(), timeout=drain_timeout)
+            except asyncio.TimeoutError:
+                logger.warning(
+                    f"AlertService drain timeout, {self.queue.qsize()} alerts remaining"
+                )
+
+        if self._worker_task:
+            self._worker_task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await self._worker_task
+
+        if self._dropped_count > 0:
+            logger.warning(f"AlertService stopped, dropped alerts: {self._dropped_count}")
+
+        self._worker_task = None
+        self.queue = None
+
+    async def send_alert(
+        self,
+        title: str,
+        detail: Dict[str, Any],
+        mention: bool = True,
+        table: bool = False,
+        env: str = "long_articles_task",
+        mention_users=None,
+        dedup_key: Optional[str] = None,
+    ):
+        if not self._running or self.queue is None:
+            return
+
+        if dedup_key and self._is_duplicate(dedup_key):
+            return
+
+        item = {
+            "title": title,
+            "detail": detail,
+            "mention": mention,
+            "table": table,
+            "env": env,
+            "mention_users": mention_users,
+        }
+
+        try:
+            self.queue.put_nowait(item)
+        except asyncio.QueueFull:
+            self._dropped_count += 1
+            logger.warning(f"Alert queue full, dropped alert: {title}")
+
+    def _is_duplicate(self, dedup_key: str) -> bool:
+        now = time.time()
+
+        while self._recent_alerts and now - self._recent_alerts[0][1] > 60:
+            self._recent_alerts.popleft()
+
+        if any(key == dedup_key for key, _ in self._recent_alerts):
+            logger.debug(f"Alert deduplicated: {dedup_key}")
+            return True
+
+        self._recent_alerts.append((dedup_key, now))
+        return False
+
+    async def _worker(self):
+        while self._running:
+            try:
+                item = await self.queue.get()
+                try:
+                    await self.feishu.bot(
+                        title=item["title"],
+                        detail=item["detail"],
+                        mention=item["mention"],
+                        table=item["table"],
+                        env=item["env"],
+                        mention_users=item["mention_users"],
+                    )
+                except Exception as e:
+                    logger.error(f"Failed to send alert: {e}")
+                finally:
+                    self.queue.task_done()
+            except asyncio.CancelledError:
+                break
+            except Exception as e:
+                logger.exception(f"AlertService worker error: {e}")

+ 72 - 8
app/core/observability/logging/log_service.py

@@ -1,13 +1,17 @@
 import asyncio
-import traceback
-import time, json
-import datetime
 import contextlib
+import json
+import logging
+import sys
+import time
+import traceback
 from typing import Optional
 
 from aliyun.log import LogClient, PutLogsRequest, LogItem
 from app.core.config.settings import AliyunLogConfig
 
+logger = logging.getLogger(__name__)
+
 
 class LogService:
     def __init__(self, log_config: AliyunLogConfig):
@@ -19,6 +23,10 @@ class LogService:
         self._worker_task: Optional[asyncio.Task] = None
         self._running = False
 
+        # metrics
+        self._dropped_count = 0
+        self._last_drop_warn_time = 0
+
     async def start(self):
         if self._running:
             return
@@ -33,21 +41,50 @@ class LogService:
         self._running = True
         self._worker_task = asyncio.create_task(self._worker())
 
-    async def stop(self):
+    async def stop(self, drain_timeout: float = 10.0):
         if not self._running:
             return
 
+        # 1. 停止接收新日志
         self._running = False
 
+        # 2. drain 队列(带超时)
+        if self.queue and self.queue.qsize() > 0:
+            remaining = self.queue.qsize()
+            logger.info(f"LogService draining {remaining} pending logs...")
+            try:
+                await asyncio.wait_for(self._drain_remaining(), timeout=drain_timeout)
+            except asyncio.TimeoutError:
+                logger.warning(
+                    f"LogService drain timeout, {self.queue.qsize()} logs lost"
+                )
+
+        # 3. 停止 worker
         if self._worker_task:
             self._worker_task.cancel()
             with contextlib.suppress(asyncio.CancelledError):
                 await self._worker_task
 
+        if self._dropped_count > 0:
+            logger.warning(f"LogService stopped, total dropped: {self._dropped_count}")
+
         self._worker_task = None
         self.queue = None
         self.client = None
 
+    async def _drain_remaining(self):
+        """把队列里剩余的日志全部发出去"""
+        while not self.queue.empty():
+            try:
+                contents = self.queue.get_nowait()
+                await asyncio.to_thread(self._put_log, contents)
+                self.queue.task_done()
+            except asyncio.QueueEmpty:
+                break
+            except Exception as e:
+                logger.error(f"LogService drain error: {e}")
+                self.queue.task_done()
+
     async def log(self, contents: dict):
         if not self._running or self.queue is None:
             return
@@ -55,8 +92,32 @@ class LogService:
         try:
             self.queue.put_nowait(contents)
         except asyncio.QueueFull:
-            # 可以打 stderr / 统计丢日志数量
-            pass
+            self._dropped_count += 1
+
+            # 每 60 秒最多告警一次
+            now = time.time()
+            if now - self._last_drop_warn_time > 60:
+                self._last_drop_warn_time = now
+                logger.warning(
+                    f"LogService queue full, dropped {self._dropped_count} logs total"
+                )
+
+                # 关键事件降级到 stderr
+                event_type = contents.get("event_type", "")
+                if event_type in ("task_failed", "task_error", "task_cancelled"):
+                    print(
+                        f"[CRITICAL LOG DROPPED] {json.dumps(contents, ensure_ascii=False, default=str)}",
+                        file=sys.stderr,
+                    )
+
+    def get_metrics(self) -> dict:
+        """获取日志服务运行指标"""
+        return {
+            "queue_size": self.queue.qsize() if self.queue else 0,
+            "queue_maxsize": self.queue.maxsize if self.queue else 0,
+            "dropped_count": self._dropped_count,
+            "is_running": self._running,
+        }
 
     async def _worker(self):
         try:
@@ -65,12 +126,15 @@ class LogService:
                 try:
                     await asyncio.to_thread(self._put_log, contents)
                 except Exception as e:
-                    print(f"[Log Error] {e}")
-                    print(traceback.format_exc())
+                    logger.error(f"LogService put_log failed: {e}")
+                finally:
+                    self.queue.task_done()
         except asyncio.CancelledError:
             pass
 
     def _put_log(self, contents: dict):
+        import datetime
+
         timestamp = int(time.time())
         contents["datetime"] = datetime.datetime.now().isoformat()
 

+ 4 - 2
app/infra/external/__init__.py

@@ -5,9 +5,11 @@ from .feishu import FeishuBotApi
 from .feishu import FeishuSheetApi
 from .elastic_search import AsyncElasticSearchClient
 from .odps_service import OdpsService
+from app.core.config.settings.feishu import FeishuConfig
 
-feishu_robot = FeishuBotApi()
-feishu_sheet = FeishuSheetApi()
+_feishu_config = FeishuConfig()
+feishu_robot = FeishuBotApi(config=_feishu_config)
+feishu_sheet = FeishuSheetApi(config=_feishu_config)
 
 __all__ = [
     "feishu_robot",

+ 20 - 42
app/infra/external/feishu.py

@@ -1,29 +1,10 @@
 import json
 
+from app.core.config.settings.feishu import FeishuConfig
 from app.infra.shared import AsyncHttpClient
 
 
 class Feishu:
-    # 服务号分组群发监测机器人
-    server_account_publish_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/380fdecf-402e-4426-85b6-7d9dbd2a9f59"
-
-    # 外部服务号投流监测机器人
-    outside_gzh_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/0899d43d-9f65-48ce-a419-f83ac935bf59"
-
-    # 长文 daily 报警机器人
-    long_articles_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-
-    # 测试环境报警机器人
-    long_articles_bot_dev = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
-
-    # 长文任务报警群
-    long_articles_task_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/223b3d72-f2e8-40e0-9b53-6956e0ae7158"
-
-    # cookie 监测机器人
-    cookie_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/51b9c83a-f50d-44dd-939f-bcd10ac6c55a"
-
-    # rank_bot
-    rank_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/f9dae7ba-decf-436b-b438-41994c35af1e"
 
     # 用户名与手机号映射
     name_phone_dict = {
@@ -33,7 +14,8 @@ class Feishu:
         "林强": "15810236123",
     }
 
-    def __init__(self):
+    def __init__(self, config: FeishuConfig = None):
+        self.config = config or FeishuConfig()
         self.token = None
         self.headers = {"Content-Type": "application/json"}
         self.mention_all = {
@@ -42,11 +24,24 @@ class Feishu:
         }
         self.not_mention = {}
 
+    def _get_webhook_url(self, env: str) -> str:
+        """根据环境获取 webhook URL"""
+        webhook_map = {
+            "dev": self.config.long_articles_bot_dev,
+            "prod": self.config.long_articles_bot,
+            "outside_gzh_monitor": self.config.outside_gzh_monitor_bot,
+            "server_account_publish_monitor": self.config.server_account_publish_monitor_bot,
+            "long_articles_task": self.config.long_articles_task_bot,
+            "cookie_monitor": self.config.cookie_monitor_bot,
+            "rank_bot": self.config.rank_monitor_bot,
+        }
+        return webhook_map.get(env, self.config.long_articles_bot_dev)
+
     async def fetch_token(self):
         url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
         post_data = {
-            "app_id": "cli_a51114cf8bf8d00c",
-            "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
+            "app_id": self.config.app_id,
+            "app_secret": self.config.app_secret,
         }
         async with AsyncHttpClient(default_headers=self.headers) as client:
             response = await client.post(url=url, json=post_data)
@@ -102,7 +97,7 @@ class FeishuSheetApi(Feishu):
                 url=insert_value_url, json=body, headers=headers
             )
 
-        print(response)
+        return response
 
     async def insert_value(self, sheet_token, sheet_id, ranges, values):
         insert_value_url = (
@@ -110,7 +105,6 @@ class FeishuSheetApi(Feishu):
                 sheet_token
             )
         )
-        # self.token = 't-g104bpfHNZN45BVJWFSQEM6WD45AAI4FNRWXCZVK'
         headers = {
             "Authorization": "Bearer " + self.token,
             "contentType": "application/json; charset=utf-8",
@@ -303,23 +297,7 @@ class FeishuBotApi(Feishu):
         :param env: 环境,决定发送到哪个机器人
         :param mention_users: 要 @ 的具体用户列表,例如 ["luojunhui"]
         """
-        match env:
-            case "dev":
-                url = self.long_articles_bot_dev
-            case "prod":
-                url = self.long_articles_bot
-            case "outside_gzh_monitor":
-                url = self.outside_gzh_monitor_bot
-            case "server_account_publish_monitor":
-                url = self.server_account_publish_monitor_bot
-            case "long_articles_task":
-                url = self.long_articles_task_bot
-            case "cookie_monitor":
-                url = self.cookie_monitor_bot
-            case "rank_bot":
-                url = self.rank_monitor_bot
-            case _:
-                url = self.long_articles_bot_dev
+        url = self._get_webhook_url(env)
 
         headers = {"Content-Type": "application/json"}
         if table:

+ 6 - 2
app/infra/shared/http_client.py

@@ -1,6 +1,10 @@
+import asyncio
+import logging
 import aiohttp
 from typing import Optional, Union, Dict, Any
 
+logger = logging.getLogger(__name__)
+
 
 class AsyncHttpClient:
     def __init__(
@@ -59,10 +63,10 @@ class AsyncHttpClient:
                 return await response.text()
 
         except aiohttp.ClientResponseError as e:
-            print(f"HTTP error: {e.status} {e.message}")
+            logger.error(f"HTTP error: {e.status} {e.message} url={url}")
             raise
         except aiohttp.ClientError as e:
-            print(f"Network error: {str(e)}")
+            logger.error(f"Network error: {e} url={url}")
             raise
 
     async def get(

+ 2 - 0
app/jobs/task_config.py

@@ -18,6 +18,8 @@ class TaskStatus:
     INIT = 0
     PROCESSING = 1
     SUCCESS = 2
+    CANCELLED = 3
+    CANCEL_REQUESTED = 4
     FAILED = 99
 
 

+ 245 - 0
app/jobs/task_lifecycle.py

@@ -0,0 +1,245 @@
+"""
+任务生命周期管理器
+
+提供分布式环境下的协程生命周期管理,支持:
+- 进程内任务注册表
+- 基于 MySQL 的跨进程取消信号
+- 轮询机制检测取消请求
+- 优雅关闭时取消所有任务
+"""
+
+import asyncio
+import logging
+from typing import Dict, Optional
+
+from app.core.database import DatabaseManager
+from app.jobs.task_config import TaskStatus
+
+logger = logging.getLogger(__name__)
+
+
+class TaskLifecycleManager:
+    """任务生命周期管理器(单例)"""
+
+    _instance: Optional["TaskLifecycleManager"] = None
+
+    def __init__(
+        self,
+        db_client: DatabaseManager,
+        poll_interval: float = 5.0,
+        force_kill_timeout: float = 10.0,
+    ):
+        """
+        初始化生命周期管理器
+
+        Args:
+            db_client: 数据库客户端
+            poll_interval: 轮询间隔(秒)
+            force_kill_timeout: 强制终止超时(秒)
+        """
+        self._registry: Dict[str, asyncio.Task] = {}
+        self._lock = asyncio.Lock()
+        self._db = db_client
+        self._poll_interval = poll_interval
+        self._force_kill_timeout = force_kill_timeout
+        self._poll_task: Optional[asyncio.Task] = None
+        self._shutting_down = False
+
+    @classmethod
+    def initialize(
+        cls,
+        db_client: DatabaseManager,
+        poll_interval: float = 5.0,
+        force_kill_timeout: float = 10.0,
+    ) -> "TaskLifecycleManager":
+        """
+        初始化单例实例
+
+        Args:
+            db_client: 数据库客户端
+            poll_interval: 轮询间隔(秒)
+            force_kill_timeout: 强制终止超时(秒)
+
+        Returns:
+            TaskLifecycleManager 实例
+        """
+        if cls._instance is None:
+            cls._instance = cls(db_client, poll_interval, force_kill_timeout)
+            logger.info(
+                f"TaskLifecycleManager initialized with poll_interval={poll_interval}s"
+            )
+        return cls._instance
+
+    @classmethod
+    def get_instance(cls) -> Optional["TaskLifecycleManager"]:
+        """获取单例实例"""
+        return cls._instance
+
+    async def register(self, trace_id: str, task: asyncio.Task) -> None:
+        """
+        注册任务到生命周期管理器
+
+        Args:
+            trace_id: 任务追踪 ID
+            task: asyncio.Task 对象
+        """
+        async with self._lock:
+            self._registry[trace_id] = task
+            logger.debug(f"Task registered: {trace_id}, total={len(self._registry)}")
+
+    async def unregister(self, trace_id: str) -> None:
+        """
+        从生命周期管理器注销任务
+
+        Args:
+            trace_id: 任务追踪 ID
+        """
+        async with self._lock:
+            if trace_id in self._registry:
+                del self._registry[trace_id]
+                logger.debug(
+                    f"Task unregistered: {trace_id}, total={len(self._registry)}"
+                )
+
+    async def cancel_local(self, trace_id: str) -> bool:
+        """
+        取消本地协程
+
+        Args:
+            trace_id: 任务追踪 ID
+
+        Returns:
+            是否成功取消
+        """
+        async with self._lock:
+            task = self._registry.get(trace_id)
+            if not task:
+                logger.debug(f"Task not found in local registry: {trace_id}")
+                return False
+
+            if task.done():
+                logger.debug(f"Task already done: {trace_id}")
+                return False
+
+            logger.info(f"Cancelling task: {trace_id}")
+            task.cancel()
+
+        # 等待任务响应取消(带超时)
+        try:
+            await asyncio.wait_for(task, timeout=self._force_kill_timeout)
+        except asyncio.CancelledError:
+            logger.info(f"Task cancelled successfully: {trace_id}")
+        except asyncio.TimeoutError:
+            logger.warning(
+                f"Task did not respond to cancellation within {self._force_kill_timeout}s: {trace_id}"
+            )
+        except Exception as e:
+            logger.error(f"Error while waiting for task cancellation: {trace_id}, {e}")
+
+        return True
+
+    async def _poll_loop(self) -> None:
+        """轮询循环:检查数据库中的取消请求"""
+        logger.info("Task lifecycle polling loop started")
+
+        while not self._shutting_down:
+            try:
+                # 查询 CANCEL_REQUESTED 状态的任务
+                rows = await self._db.async_fetch(
+                    "SELECT trace_id FROM long_articles_task_manager "
+                    "WHERE task_status = %s",
+                    params=(TaskStatus.CANCEL_REQUESTED,),
+                )
+
+                if rows:
+                    # 获取本地注册表的快照
+                    async with self._lock:
+                        local_trace_ids = set(self._registry.keys())
+
+                    # 取消本地存在的任务
+                    for row in rows:
+                        trace_id = row["trace_id"]
+                        if trace_id in local_trace_ids:
+                            logger.info(
+                                f"Cancel signal detected for task: {trace_id}"
+                            )
+                            await self.cancel_local(trace_id)
+
+            except Exception as e:
+                logger.exception(f"Error in poll loop: {e}")
+
+            # 等待下一次轮询
+            await asyncio.sleep(self._poll_interval)
+
+        logger.info("Task lifecycle polling loop stopped")
+
+    async def start_polling(self) -> None:
+        """启动轮询协程"""
+        if self._poll_task is not None:
+            logger.warning("Polling already started")
+            return
+
+        self._poll_task = asyncio.create_task(
+            self._poll_loop(), name="task_lifecycle_poll"
+        )
+        logger.info("Task lifecycle polling started")
+
+    async def stop_polling(self) -> None:
+        """停止轮询协程"""
+        if self._poll_task is None:
+            return
+
+        self._shutting_down = True
+        self._poll_task.cancel()
+
+        try:
+            await self._poll_task
+        except asyncio.CancelledError:
+            pass
+
+        self._poll_task = None
+        logger.info("Task lifecycle polling stopped")
+
+    async def shutdown(self, timeout: float = 30.0) -> None:
+        """
+        优雅关闭:取消所有任务并等待完成
+
+        Args:
+            timeout: 等待任务完成的超时时间(秒)
+        """
+        logger.info("TaskLifecycleManager shutting down...")
+
+        # 获取所有任务的快照
+        async with self._lock:
+            tasks = list(self._registry.values())
+            trace_ids = list(self._registry.keys())
+
+        if tasks:
+            logger.info(f"Cancelling {len(tasks)} running tasks: {trace_ids}")
+
+            # 取消所有任务
+            for task in tasks:
+                if not task.done():
+                    task.cancel()
+
+            # 等待所有任务完成(带超时)
+            try:
+                await asyncio.wait_for(
+                    asyncio.gather(*tasks, return_exceptions=True),
+                    timeout=timeout,
+                )
+                logger.info("All tasks cancelled successfully")
+            except asyncio.TimeoutError:
+                logger.warning(
+                    f"Some tasks did not finish within {timeout}s timeout"
+                )
+        else:
+            logger.info("No running tasks to cancel")
+
+        # 停止轮询
+        await self.stop_polling()
+
+        logger.info("TaskLifecycleManager shutdown complete")
+
+
+__all__ = ["TaskLifecycleManager"]

+ 8 - 0
app/jobs/task_utils.py

@@ -46,6 +46,13 @@ class TaskLockError(TaskError):
         super().__init__(message, retryable=False, task_name=task_name)
 
 
+class TaskCancelledError(TaskError):
+    """任务被取消(不可重试)"""
+
+    def __init__(self, message: str, task_name: Optional[str] = None):
+        super().__init__(message, retryable=False, task_name=task_name)
+
+
 class TaskUtils:
     """任务工具类"""
 
@@ -84,5 +91,6 @@ __all__ = [
     "TaskTimeoutError",
     "TaskConcurrencyError",
     "TaskLockError",
+    "TaskCancelledError",
     "TaskUtils",
 ]

+ 2 - 2
docker-compose.yaml

@@ -8,8 +8,8 @@ services:
     container_name: task-server-app
     ports:
       - "6060:6060"
-    volumes:
-      - .:/app
+    env_file:
+      - .env
     environment:
       - PYTHONUNBUFFERED=1
     restart: always

+ 35 - 0
task_app.py

@@ -5,11 +5,15 @@ from quart import Quart
 from app.core.bootstrap import AppContext
 from app.core.dependency import ServerContainer
 from app.api.v1.routes import server_routes
+from app.jobs.task_lifecycle import TaskLifecycleManager
+from app.core.observability import AlertService
+from app.infra.external import feishu_robot
 
 logging.basicConfig(level=logging.INFO)
 
 app = Quart(__name__)
 app = cors(app, allow_origin="*")
+app.config["ACCEPTING_TASKS"] = True
 
 server_container = ServerContainer()
 ctx = AppContext(server_container)
@@ -27,11 +31,42 @@ app.register_blueprint(routes)
 async def startup():
     logging.info("Starting application...")
     await ctx.start_up()
+
+    # 初始化 AlertService
+    alert_service = AlertService.initialize(feishu_robot)
+    await alert_service.start()
+
+    # 初始化 TaskLifecycleManager
+    lifecycle = TaskLifecycleManager.initialize(mysql_manager, poll_interval=5.0)
+    await lifecycle.start_polling()
+
     logging.info("Application started successfully")
 
 
 @app.after_serving
 async def shutdown():
     logging.info("Shutting down application...")
+
+    # 阶段 1:停止接收新任务
+    app.config["ACCEPTING_TASKS"] = False
+    logging.info("Phase 1: Stopped accepting new tasks")
+
+    # 阶段 2:取消/等待存量任务
+    lifecycle = TaskLifecycleManager.get_instance()
+    if lifecycle:
+        await lifecycle.shutdown(timeout=30.0)
+    logging.info("Phase 2: All tasks cancelled/completed")
+
+    # 阶段 3:flush 告警和日志
+    alert_service = AlertService.get_instance()
+    if alert_service:
+        await alert_service.stop(drain_timeout=5.0)
+
+    await log_service.stop(drain_timeout=10.0)
+    logging.info("Phase 3: Alerts and logs flushed")
+
+    # 阶段 4:关闭数据库连接
     await ctx.shutdown()
+    logging.info("Phase 4: Database pools closed")
+
     logging.info("Application shutdown successfully")

Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.