import base64 import hashlib import hmac import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict from zoneinfo import ZoneInfo import requests # 支持直接以脚本方式运行:python scheduler/decode_hourly_stats_job.py _PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) from utils.scheduler_logger import get_scheduler_logger from utils.sync_mysql_help import mysql logger = get_scheduler_logger() _TZ = ZoneInfo("Asia/Shanghai") _FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/af94b535-ed47-47d8-87f4-d893e1077276" _FEISHU_SIGN_SECRET = "lebZtBVkKJrbaVFlss2Pcf" _ALERT_FAIL_RATE = 0.15 _ALERT_SUCCESS_RATE = 0.20 def _previous_hour_window(now: datetime) -> tuple[datetime, datetime]: current_hour = now.replace(minute=0, second=0, microsecond=0) return current_hour - timedelta(hours=1), current_hour def _fetch_hourly_stats(window_start: datetime, window_end: datetime) -> Dict[str, int]: sql = """ SELECT COUNT(1) AS total_count, SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS success_count, SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) AS pending_count, SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS running_count, SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS fail_count FROM aigc_topic_decode_task_result WHERE create_time >= %s AND create_time < %s """ row = mysql.fetchone( sql, ( window_start.strftime("%Y-%m-%d %H:%M:%S"), window_end.strftime("%Y-%m-%d %H:%M:%S"), ), ) return { "total": int((row or {}).get("total_count") or 0), "success": int((row or {}).get("success_count") or 0), "pending": int((row or {}).get("pending_count") or 0), "running": int((row or {}).get("running_count") or 0), "fail": int((row or {}).get("fail_count") or 0), } def _fetch_today_stats(now: datetime) -> Dict[str, int]: day_start = now.replace(hour=0, minute=0, second=0, microsecond=0) sql = """ SELECT SUM(CASE WHEN status IN (1, 2, 3) THEN 1 ELSE 0 END) AS executed_count FROM aigc_topic_decode_task_result WHERE create_time >= %s AND create_time < %s """ row = mysql.fetchone( sql, ( day_start.strftime("%Y-%m-%d %H:%M:%S"), now.strftime("%Y-%m-%d %H:%M:%S"), ), ) executed_total = int((row or {}).get("executed_count") or 0) demand_total = _fetch_decode_daily_limit() pending_total = max(demand_total - executed_total, 0) return { "executed_total": executed_total, "pending_total": pending_total, } def _fetch_decode_daily_limit() -> int: sql = """ SELECT `max` AS daily_limit FROM aigc_topic_decode_task_oprate ORDER BY id DESC LIMIT 1 """ row = mysql.fetchone(sql) return int((row or {}).get("daily_limit") or 0) def _has_overdue_pending_task(now: datetime) -> bool: overdue_before = now - timedelta(minutes=30) sql = """ SELECT COUNT(1) AS total_count FROM aigc_topic_decode_task_result WHERE status IN (0, 1) AND create_time <= %s """ row = mysql.fetchone(sql, (overdue_before.strftime("%Y-%m-%d %H:%M:%S"),)) return int((row or {}).get("total_count") or 0) > 0 def _gen_feishu_sign(timestamp: str, secret: str) -> str: # 与飞书文档一致:把 timestamp + "\n" + 密钥 作为 key,对空串做 HmacSHA256 再 Base64 # https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot string_to_sign = "{}\n{}".format(timestamp, secret) hmac_code = hmac.new( string_to_sign.encode("utf-8"), b"", digestmod=hashlib.sha256, ).digest() return base64.b64encode(hmac_code).decode("utf-8") def _build_feishu_card( *, is_alert: bool, window_start: datetime, window_end: datetime, total: int, success: int, pending: int, running: int, fail: int, fail_rate: float, today_executed_total: int, today_pending_total: int, ) -> Dict: header_title = "票圈供给-AIGC选题解构任务小时统计" header_subtitle = ( f"{window_start.strftime('%Y-%m-%d %H:%M')} - {window_end.strftime('%H:%M')}" ) level_tag = "异常告警" if is_alert else "运行正常" template = "red" if is_alert else "blue" if is_alert: if fail_rate > _ALERT_FAIL_RATE: status_line = "⚠️ 当前失败率已超过阈值,请尽快排查" else: status_line = "⚠️ 当前成功率偏低且存在超时任务,请尽快排查" else: status_line = "✅ 当前任务运行正常" markdown_lines = [ status_line, f"**发起数**:{total}", f"**成功数**:{success}", f"**执行中数**:{pending + running}", f"**失败数**:{fail}", f"**失败率**:{fail_rate:.2%}", f"**当日已执行总数**:{today_executed_total}", f"**当日待执行总数**:{today_pending_total}", ] body_elements = [ { "tag": "markdown", "content": "\n".join(markdown_lines), "text_align": "left", } ] if is_alert: body_elements.insert( 0, { "tag": "markdown", "content": "", "text_align": "left", }, ) return { "schema": "2.0", "config": { "enable_forward": True, "update_multi": True, "width_mode": "compact", }, "header": { "title": { "tag": "plain_text", "content": header_title, }, "subtitle": { "tag": "plain_text", "content": header_subtitle, }, "text_tag_list": [ { "tag": "text_tag", "text": { "tag": "plain_text", "content": level_tag, }, "color": "red" if is_alert else "green", } ], "template": template, "padding": "12px 12px 12px 12px", }, "body": { "direction": "vertical", "padding": "12px 12px 12px 12px", "horizontal_align": "left", "vertical_spacing": "8px", "elements": body_elements, }, } def _send_feishu_card(card: Dict) -> None: timestamp = str(int(time.time())) sign = _gen_feishu_sign(timestamp, _FEISHU_SIGN_SECRET) payload = { "timestamp": timestamp, "sign": sign, "msg_type": "interactive", "card": card, } resp = requests.post(_FEISHU_WEBHOOK, json=payload, timeout=10) if resp.status_code != 200: raise RuntimeError(f"send_feishu_http_status_{resp.status_code}") body = resp.json() if body.get("code") != 0: raise RuntimeError(f"send_feishu_failed code={body.get('code')} msg={body.get('msg')}") def run_decode_hourly_stats_job() -> None: now = datetime.now(_TZ) window_start, window_end = _previous_hour_window(now) logger.info( "解构小时统计任务开始,统计窗口=[{}, {})", window_start.strftime("%Y-%m-%d %H:%M:%S"), window_end.strftime("%Y-%m-%d %H:%M:%S"), ) try: stats = _fetch_hourly_stats(window_start, window_end) total = stats["total"] success = stats["success"] pending = stats["pending"] running = stats["running"] fail = stats["fail"] executed_in_window = success + running + fail if executed_in_window <= 0: logger.info( "当前统计窗口无已执行任务,跳过推送 window=[{}, {}) total={} pending={}", window_start.strftime("%Y-%m-%d %H:%M:%S"), window_end.strftime("%Y-%m-%d %H:%M:%S"), total, pending, ) return fail_rate = (fail / total) if total > 0 else 0.0 success_rate = (success / total) if total > 0 else 0.0 today_stats = _fetch_today_stats(now) today_executed_total = today_stats["executed_total"] today_pending_total = today_stats["pending_total"] has_overdue_pending_task = _has_overdue_pending_task(now) is_alert = total > 0 and ( fail_rate > _ALERT_FAIL_RATE or (success_rate < _ALERT_SUCCESS_RATE and has_overdue_pending_task) ) card = _build_feishu_card( is_alert=is_alert, window_start=window_start, window_end=window_end, total=total, success=success, pending=pending, running=running, fail=fail, fail_rate=fail_rate, today_executed_total=today_executed_total, today_pending_total=today_pending_total, ) _send_feishu_card(card) logger.info( "解构小时统计推送完成 total={} success={} pending={} running={} fail={} fail_rate={:.2%} success_rate={:.2%} has_overdue_pending_task={} is_alert={}", total, success, pending, running, fail, fail_rate, success_rate, has_overdue_pending_task, is_alert, ) except Exception as exc: logger.exception("解构小时统计任务失败: {}", exc) if __name__ == "__main__": run_decode_hourly_stats_job()