Просмотр исходного кода

feat: 增加小时级统计报警

jihuaqiang 1 неделя назад
Родитель
Сommit
c2e2182d94
3 измененных файлов с 278 добавлено и 1 удалено
  1. 8 1
      scheduler/bootstrap.py
  2. 253 0
      scheduler/decode_hourly_stats_job.py
  3. 17 0
      scheduler/odps_fetch.py

+ 8 - 1
scheduler/bootstrap.py

@@ -6,6 +6,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.triggers.cron import CronTrigger
 
 from scheduler.decode_dispatch_job import run_decode_dispatch_job
+from scheduler.decode_hourly_stats_job import run_decode_hourly_stats_job
 from utils.scheduler_logger import get_scheduler_logger
 
 
@@ -27,8 +28,14 @@ def start_scheduler() -> None:
         replace_existing=True,
         next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
     )
+    _scheduler.add_job(
+        run_decode_hourly_stats_job,
+        trigger=CronTrigger(minute=0),
+        id="decode_hourly_stats",
+        replace_existing=True,
+    )
     _scheduler.start()
-    logger.info("调度器已启动:立即触发一次,之后每3分钟执行一次")
+    logger.info("调度器已启动:解码任务每3分钟执行,统计任务每小时整点执行")
 
 
 def stop_scheduler() -> None:

+ 253 - 0
scheduler/decode_hourly_stats_job.py

@@ -0,0 +1,253 @@
+import base64
+import hashlib
+import hmac
+import sys
+import time
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Dict
+from zoneinfo import ZoneInfo
+
+import requests
+
+# 支持直接以脚本方式运行:python scheduler/decode_hourly_stats_job.py
+_PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(_PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(_PROJECT_ROOT))
+
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+from scheduler.odps_fetch import count_priority_posts
+
+
+logger = get_scheduler_logger()
+_TZ = ZoneInfo("Asia/Shanghai")
+_FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/af94b535-ed47-47d8-87f4-d893e1077276"
+_FEISHU_SIGN_SECRET = "lebZtBVkKJrbaVFlss2Pcf"
+_ALERT_FAIL_RATE = 0.15
+
+
+def _previous_hour_window(now: datetime) -> tuple[datetime, datetime]:
+    current_hour = now.replace(minute=0, second=0, microsecond=0)
+    return current_hour - timedelta(hours=1), current_hour
+
+
+def _fetch_hourly_stats(window_start: datetime, window_end: datetime) -> Dict[str, int]:
+    sql = """
+        SELECT
+            COUNT(1) AS total_count,
+            SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS success_count,
+            SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) AS pending_count,
+            SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS running_count,
+            SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS fail_count
+        FROM aigc_topic_decode_task_result
+        WHERE create_time >= %s
+          AND create_time < %s
+    """
+    row = mysql.fetchone(
+        sql,
+        (
+            window_start.strftime("%Y-%m-%d %H:%M:%S"),
+            window_end.strftime("%Y-%m-%d %H:%M:%S"),
+        ),
+    )
+    return {
+        "total": int((row or {}).get("total_count") or 0),
+        "success": int((row or {}).get("success_count") or 0),
+        "pending": int((row or {}).get("pending_count") or 0),
+        "running": int((row or {}).get("running_count") or 0),
+        "fail": int((row or {}).get("fail_count") or 0),
+    }
+
+
+def _fetch_today_stats(now: datetime) -> Dict[str, int]:
+    day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
+    sql = """
+        SELECT
+            SUM(CASE WHEN status IN (1, 2, 3) THEN 1 ELSE 0 END) AS executed_count
+        FROM aigc_topic_decode_task_result
+        WHERE create_time >= %s
+          AND create_time < %s
+    """
+    row = mysql.fetchone(
+        sql,
+        (
+            day_start.strftime("%Y-%m-%d %H:%M:%S"),
+            now.strftime("%Y-%m-%d %H:%M:%S"),
+        ),
+    )
+    executed_total = int((row or {}).get("executed_count") or 0)
+    demand_total = int(count_priority_posts(now.strftime("%Y%m%d")) or 0)
+    pending_total = max(demand_total - executed_total, 0)
+    return {
+        "executed_total": executed_total,
+        "pending_total": pending_total,
+    }
+
+
+def _gen_feishu_sign(timestamp: str, secret: str) -> str:
+    # 与飞书文档一致:把 timestamp + "\n" + 密钥 作为 key,对空串做 HmacSHA256 再 Base64
+    # https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot
+    string_to_sign = "{}\n{}".format(timestamp, secret)
+    hmac_code = hmac.new(
+        string_to_sign.encode("utf-8"),
+        b"",
+        digestmod=hashlib.sha256,
+    ).digest()
+    return base64.b64encode(hmac_code).decode("utf-8")
+
+
+def _build_feishu_card(
+    *,
+    is_alert: bool,
+    window_start: datetime,
+    window_end: datetime,
+    total: int,
+    success: int,
+    pending: int,
+    running: int,
+    fail: int,
+    fail_rate: float,
+    today_executed_total: int,
+    today_pending_total: int,
+) -> Dict:
+    header_title = "AIGC解构任务小时统计"
+    header_subtitle = (
+        f"{window_start.strftime('%Y-%m-%d %H:%M')} - {window_end.strftime('%H:%M')}"
+    )
+    level_tag = "异常告警" if is_alert else "运行正常"
+    template = "red" if is_alert else "blue"
+    status_line = "⚠️ 当前失败率已超过阈值,请尽快排查" if is_alert else "✅ 当前任务运行正常"
+
+    markdown_lines = [
+        status_line,
+        f"**发起数**:{total}",
+        f"**成功数**:{success}",
+        f"**执行中数**:{pending + running}",
+        f"**失败数**:{fail}",
+        f"**失败率**:{fail_rate:.2%}",
+        f"**当日已执行总数**:{today_executed_total}",
+        f"**当日待执行总数**:{today_pending_total}",
+    ]
+    body_elements = [
+        {
+            "tag": "markdown",
+            "content": "\n".join(markdown_lines),
+            "text_align": "left",
+        }
+    ]
+    if is_alert:
+        body_elements.insert(
+            0,
+            {
+                "tag": "markdown",
+                "content": "<at id=all></at>",
+                "text_align": "left",
+            },
+        )
+
+    return {
+        "schema": "2.0",
+        "config": {
+            "enable_forward": True,
+            "update_multi": True,
+            "width_mode": "compact",
+        },
+        "header": {
+            "title": {
+                "tag": "plain_text",
+                "content": header_title,
+            },
+            "subtitle": {
+                "tag": "plain_text",
+                "content": header_subtitle,
+            },
+            "text_tag_list": [
+                {
+                    "tag": "text_tag",
+                    "text": {
+                        "tag": "plain_text",
+                        "content": level_tag,
+                    },
+                    "color": "red" if is_alert else "green",
+                }
+            ],
+            "template": template,
+            "padding": "12px 12px 12px 12px",
+        },
+        "body": {
+            "direction": "vertical",
+            "padding": "12px 12px 12px 12px",
+            "horizontal_align": "left",
+            "vertical_spacing": "8px",
+            "elements": body_elements,
+        },
+    }
+
+
+def _send_feishu_card(card: Dict) -> None:
+    timestamp = str(int(time.time()))
+    sign = _gen_feishu_sign(timestamp, _FEISHU_SIGN_SECRET)
+    payload = {
+        "timestamp": timestamp,
+        "sign": sign,
+        "msg_type": "interactive",
+        "card": card,
+    }
+    resp = requests.post(_FEISHU_WEBHOOK, json=payload, timeout=10)
+    if resp.status_code != 200:
+        raise RuntimeError(f"send_feishu_http_status_{resp.status_code}")
+    body = resp.json()
+    if body.get("code") != 0:
+        raise RuntimeError(f"send_feishu_failed code={body.get('code')} msg={body.get('msg')}")
+
+
+def run_decode_hourly_stats_job() -> None:
+    now = datetime.now(_TZ)
+    window_start, window_end = _previous_hour_window(now)
+    logger.info(
+        "解构小时统计任务开始,统计窗口=[{}, {})",
+        window_start.strftime("%Y-%m-%d %H:%M:%S"),
+        window_end.strftime("%Y-%m-%d %H:%M:%S"),
+    )
+    try:
+        stats = _fetch_hourly_stats(window_start, window_end)
+        total = stats["total"]
+        success = stats["success"]
+        pending = stats["pending"]
+        running = stats["running"]
+        fail = stats["fail"]
+        fail_rate = (fail / total) if total > 0 else 0.0
+        today_stats = _fetch_today_stats(now)
+        today_executed_total = today_stats["executed_total"]
+        today_pending_total = today_stats["pending_total"]
+        is_alert = total > 0 and fail_rate >= _ALERT_FAIL_RATE
+        card = _build_feishu_card(
+            is_alert=is_alert,
+            window_start=window_start,
+            window_end=window_end,
+            total=total,
+            success=success,
+            pending=pending,
+            running=running,
+            fail=fail,
+            fail_rate=fail_rate,
+            today_executed_total=today_executed_total,
+            today_pending_total=today_pending_total,
+        )
+        _send_feishu_card(card)
+        logger.info(
+            "解构小时统计推送完成 total={} success={} pending={} running={} fail={} fail_rate={:.2%} is_alert={}",
+            total,
+            success,
+            pending,
+            running,
+            fail,
+            fail_rate,
+            is_alert,
+        )
+    except Exception as exc:
+        logger.exception("解构小时统计任务失败: {}", exc)
+
+if __name__ == "__main__":
+    run_decode_hourly_stats_job()   

+ 17 - 0
scheduler/odps_fetch.py

@@ -83,6 +83,23 @@ def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]:
     return fetch_priority_posts(limit=limit, offset=0)
 
 
+def count_priority_posts(dt: Optional[str] = None) -> int:
+    target_dt = dt or _today_dt()
+    sql = f"""
+        SELECT COUNT(1) AS total_count
+        FROM {ODPS_TABLE}
+        WHERE dt = '{target_dt}'
+          AND level IN (0, 1, 2)
+    """
+    logger.info("开始执行ODPS计数 dt={}", target_dt)
+    odps = _build_odps_client()
+    instance = odps.execute_sql(sql)
+    with instance.open_reader() as reader:
+        for row in reader:
+            return int(row[0] or 0)
+    return 0
+
+
 if __name__ == "__main__":
     try:
         test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10"))