| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- import base64
- import hashlib
- import hmac
- import sys
- import time
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict
- from zoneinfo import ZoneInfo
- import requests
- # 支持直接以脚本方式运行:python scheduler/decode_hourly_stats_job.py
- _PROJECT_ROOT = Path(__file__).resolve().parent.parent
- if str(_PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(_PROJECT_ROOT))
- from utils.scheduler_logger import get_scheduler_logger
- from utils.sync_mysql_help import mysql
- logger = get_scheduler_logger()
- _TZ = ZoneInfo("Asia/Shanghai")
- _FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/af94b535-ed47-47d8-87f4-d893e1077276"
- _FEISHU_SIGN_SECRET = "lebZtBVkKJrbaVFlss2Pcf"
- _ALERT_FAIL_RATE = 0.15
- _ALERT_SUCCESS_RATE = 0.20
- def _previous_hour_window(now: datetime) -> tuple[datetime, datetime]:
- current_hour = now.replace(minute=0, second=0, microsecond=0)
- return current_hour - timedelta(hours=1), current_hour
- def _fetch_hourly_stats(window_start: datetime, window_end: datetime) -> Dict[str, int]:
- sql = """
- SELECT
- COUNT(1) AS total_count,
- SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS success_count,
- SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) AS pending_count,
- SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS running_count,
- SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS fail_count
- FROM aigc_topic_decode_task_result
- WHERE create_time >= %s
- AND create_time < %s
- """
- row = mysql.fetchone(
- sql,
- (
- window_start.strftime("%Y-%m-%d %H:%M:%S"),
- window_end.strftime("%Y-%m-%d %H:%M:%S"),
- ),
- )
- return {
- "total": int((row or {}).get("total_count") or 0),
- "success": int((row or {}).get("success_count") or 0),
- "pending": int((row or {}).get("pending_count") or 0),
- "running": int((row or {}).get("running_count") or 0),
- "fail": int((row or {}).get("fail_count") or 0),
- }
- def _fetch_today_stats(now: datetime) -> Dict[str, int]:
- day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
- sql = """
- SELECT
- SUM(CASE WHEN status IN (1, 2, 3) THEN 1 ELSE 0 END) AS executed_count
- FROM aigc_topic_decode_task_result
- WHERE create_time >= %s
- AND create_time < %s
- """
- row = mysql.fetchone(
- sql,
- (
- day_start.strftime("%Y-%m-%d %H:%M:%S"),
- now.strftime("%Y-%m-%d %H:%M:%S"),
- ),
- )
- executed_total = int((row or {}).get("executed_count") or 0)
- demand_total = _fetch_decode_daily_limit()
- pending_total = max(demand_total - executed_total, 0)
- return {
- "executed_total": executed_total,
- "pending_total": pending_total,
- }
- def _fetch_decode_daily_limit() -> int:
- sql = """
- SELECT `max` AS daily_limit
- FROM aigc_topic_decode_task_oprate
- ORDER BY id DESC
- LIMIT 1
- """
- row = mysql.fetchone(sql)
- return int((row or {}).get("daily_limit") or 0)
- def _has_overdue_pending_task(now: datetime) -> bool:
- overdue_before = now - timedelta(minutes=30)
- sql = """
- SELECT COUNT(1) AS total_count
- FROM aigc_topic_decode_task_result
- WHERE status IN (0, 1)
- AND create_time <= %s
- """
- row = mysql.fetchone(sql, (overdue_before.strftime("%Y-%m-%d %H:%M:%S"),))
- return int((row or {}).get("total_count") or 0) > 0
- def _gen_feishu_sign(timestamp: str, secret: str) -> str:
- # 与飞书文档一致:把 timestamp + "\n" + 密钥 作为 key,对空串做 HmacSHA256 再 Base64
- # https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot
- string_to_sign = "{}\n{}".format(timestamp, secret)
- hmac_code = hmac.new(
- string_to_sign.encode("utf-8"),
- b"",
- digestmod=hashlib.sha256,
- ).digest()
- return base64.b64encode(hmac_code).decode("utf-8")
- def _build_feishu_card(
- *,
- is_alert: bool,
- window_start: datetime,
- window_end: datetime,
- total: int,
- success: int,
- pending: int,
- running: int,
- fail: int,
- fail_rate: float,
- today_executed_total: int,
- today_pending_total: int,
- ) -> Dict:
- header_title = "票圈供给-AIGC选题解构任务小时统计"
- header_subtitle = (
- f"{window_start.strftime('%Y-%m-%d %H:%M')} - {window_end.strftime('%H:%M')}"
- )
- level_tag = "异常告警" if is_alert else "运行正常"
- template = "red" if is_alert else "blue"
- if is_alert:
- if fail_rate > _ALERT_FAIL_RATE:
- status_line = "⚠️ 当前失败率已超过阈值,请尽快排查"
- else:
- status_line = "⚠️ 当前成功率偏低且存在超时任务,请尽快排查"
- else:
- status_line = "✅ 当前任务运行正常"
- markdown_lines = [
- status_line,
- f"**发起数**:{total}",
- f"**成功数**:{success}",
- f"**执行中数**:{pending + running}",
- f"**失败数**:{fail}",
- f"**失败率**:{fail_rate:.2%}",
- f"**当日已执行总数**:{today_executed_total}",
- f"**当日待执行总数**:{today_pending_total}",
- ]
- body_elements = [
- {
- "tag": "markdown",
- "content": "\n".join(markdown_lines),
- "text_align": "left",
- }
- ]
- if is_alert:
- body_elements.insert(
- 0,
- {
- "tag": "markdown",
- "content": "<at id=all></at>",
- "text_align": "left",
- },
- )
- return {
- "schema": "2.0",
- "config": {
- "enable_forward": True,
- "update_multi": True,
- "width_mode": "compact",
- },
- "header": {
- "title": {
- "tag": "plain_text",
- "content": header_title,
- },
- "subtitle": {
- "tag": "plain_text",
- "content": header_subtitle,
- },
- "text_tag_list": [
- {
- "tag": "text_tag",
- "text": {
- "tag": "plain_text",
- "content": level_tag,
- },
- "color": "red" if is_alert else "green",
- }
- ],
- "template": template,
- "padding": "12px 12px 12px 12px",
- },
- "body": {
- "direction": "vertical",
- "padding": "12px 12px 12px 12px",
- "horizontal_align": "left",
- "vertical_spacing": "8px",
- "elements": body_elements,
- },
- }
- def _send_feishu_card(card: Dict) -> None:
- timestamp = str(int(time.time()))
- sign = _gen_feishu_sign(timestamp, _FEISHU_SIGN_SECRET)
- payload = {
- "timestamp": timestamp,
- "sign": sign,
- "msg_type": "interactive",
- "card": card,
- }
- resp = requests.post(_FEISHU_WEBHOOK, json=payload, timeout=10)
- if resp.status_code != 200:
- raise RuntimeError(f"send_feishu_http_status_{resp.status_code}")
- body = resp.json()
- if body.get("code") != 0:
- raise RuntimeError(f"send_feishu_failed code={body.get('code')} msg={body.get('msg')}")
- def run_decode_hourly_stats_job() -> None:
- now = datetime.now(_TZ)
- window_start, window_end = _previous_hour_window(now)
- logger.info(
- "解构小时统计任务开始,统计窗口=[{}, {})",
- window_start.strftime("%Y-%m-%d %H:%M:%S"),
- window_end.strftime("%Y-%m-%d %H:%M:%S"),
- )
- try:
- stats = _fetch_hourly_stats(window_start, window_end)
- total = stats["total"]
- success = stats["success"]
- pending = stats["pending"]
- running = stats["running"]
- fail = stats["fail"]
- executed_in_window = success + running + fail
- if executed_in_window <= 0:
- logger.info(
- "当前统计窗口无已执行任务,跳过推送 window=[{}, {}) total={} pending={}",
- window_start.strftime("%Y-%m-%d %H:%M:%S"),
- window_end.strftime("%Y-%m-%d %H:%M:%S"),
- total,
- pending,
- )
- return
- fail_rate = (fail / total) if total > 0 else 0.0
- success_rate = (success / total) if total > 0 else 0.0
- today_stats = _fetch_today_stats(now)
- today_executed_total = today_stats["executed_total"]
- today_pending_total = today_stats["pending_total"]
- has_overdue_pending_task = _has_overdue_pending_task(now)
- is_alert = total > 0 and (
- fail_rate > _ALERT_FAIL_RATE
- or (success_rate < _ALERT_SUCCESS_RATE and has_overdue_pending_task)
- )
- card = _build_feishu_card(
- is_alert=is_alert,
- window_start=window_start,
- window_end=window_end,
- total=total,
- success=success,
- pending=pending,
- running=running,
- fail=fail,
- fail_rate=fail_rate,
- today_executed_total=today_executed_total,
- today_pending_total=today_pending_total,
- )
- _send_feishu_card(card)
- logger.info(
- "解构小时统计推送完成 total={} success={} pending={} running={} fail={} fail_rate={:.2%} success_rate={:.2%} has_overdue_pending_task={} is_alert={}",
- total,
- success,
- pending,
- running,
- fail,
- fail_rate,
- success_rate,
- has_overdue_pending_task,
- is_alert,
- )
- except Exception as exc:
- logger.exception("解构小时统计任务失败: {}", exc)
- if __name__ == "__main__":
- run_decode_hourly_stats_job()
|