decode_hourly_stats_job.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import base64
  2. import hashlib
  3. import hmac
  4. import sys
  5. import time
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from typing import Dict
  9. from zoneinfo import ZoneInfo
  10. import requests
  11. # 支持直接以脚本方式运行:python scheduler/decode_hourly_stats_job.py
  12. _PROJECT_ROOT = Path(__file__).resolve().parent.parent
  13. if str(_PROJECT_ROOT) not in sys.path:
  14. sys.path.insert(0, str(_PROJECT_ROOT))
  15. from utils.scheduler_logger import get_scheduler_logger
  16. from utils.sync_mysql_help import mysql
  17. from scheduler.odps_fetch import count_priority_posts
  18. logger = get_scheduler_logger()
  19. _TZ = ZoneInfo("Asia/Shanghai")
  20. _FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/af94b535-ed47-47d8-87f4-d893e1077276"
  21. _FEISHU_SIGN_SECRET = "lebZtBVkKJrbaVFlss2Pcf"
  22. _ALERT_FAIL_RATE = 0.15
  23. _ALERT_SUCCESS_RATE = 0.20
  24. def _previous_hour_window(now: datetime) -> tuple[datetime, datetime]:
  25. current_hour = now.replace(minute=0, second=0, microsecond=0)
  26. return current_hour - timedelta(hours=1), current_hour
  27. def _fetch_hourly_stats(window_start: datetime, window_end: datetime) -> Dict[str, int]:
  28. sql = """
  29. SELECT
  30. COUNT(1) AS total_count,
  31. SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS success_count,
  32. SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) AS pending_count,
  33. SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS running_count,
  34. SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS fail_count
  35. FROM aigc_topic_decode_task_result
  36. WHERE create_time >= %s
  37. AND create_time < %s
  38. """
  39. row = mysql.fetchone(
  40. sql,
  41. (
  42. window_start.strftime("%Y-%m-%d %H:%M:%S"),
  43. window_end.strftime("%Y-%m-%d %H:%M:%S"),
  44. ),
  45. )
  46. return {
  47. "total": int((row or {}).get("total_count") or 0),
  48. "success": int((row or {}).get("success_count") or 0),
  49. "pending": int((row or {}).get("pending_count") or 0),
  50. "running": int((row or {}).get("running_count") or 0),
  51. "fail": int((row or {}).get("fail_count") or 0),
  52. }
  53. def _fetch_today_stats(now: datetime) -> Dict[str, int]:
  54. day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
  55. sql = """
  56. SELECT
  57. SUM(CASE WHEN status IN (1, 2, 3) THEN 1 ELSE 0 END) AS executed_count
  58. FROM aigc_topic_decode_task_result
  59. WHERE create_time >= %s
  60. AND create_time < %s
  61. """
  62. row = mysql.fetchone(
  63. sql,
  64. (
  65. day_start.strftime("%Y-%m-%d %H:%M:%S"),
  66. now.strftime("%Y-%m-%d %H:%M:%S"),
  67. ),
  68. )
  69. executed_total = int((row or {}).get("executed_count") or 0)
  70. demand_total = int(count_priority_posts(now.strftime("%Y%m%d")) or 0)
  71. pending_total = max(demand_total - executed_total, 0)
  72. return {
  73. "executed_total": executed_total,
  74. "pending_total": pending_total,
  75. }
  76. def _has_overdue_pending_task(now: datetime) -> bool:
  77. overdue_before = now - timedelta(minutes=30)
  78. sql = """
  79. SELECT COUNT(1) AS total_count
  80. FROM aigc_topic_decode_task_result
  81. WHERE status IN (0, 1)
  82. AND create_time <= %s
  83. """
  84. row = mysql.fetchone(sql, (overdue_before.strftime("%Y-%m-%d %H:%M:%S"),))
  85. return int((row or {}).get("total_count") or 0) > 0
  86. def _gen_feishu_sign(timestamp: str, secret: str) -> str:
  87. # 与飞书文档一致:把 timestamp + "\n" + 密钥 作为 key,对空串做 HmacSHA256 再 Base64
  88. # https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot
  89. string_to_sign = "{}\n{}".format(timestamp, secret)
  90. hmac_code = hmac.new(
  91. string_to_sign.encode("utf-8"),
  92. b"",
  93. digestmod=hashlib.sha256,
  94. ).digest()
  95. return base64.b64encode(hmac_code).decode("utf-8")
  96. def _build_feishu_card(
  97. *,
  98. is_alert: bool,
  99. window_start: datetime,
  100. window_end: datetime,
  101. total: int,
  102. success: int,
  103. pending: int,
  104. running: int,
  105. fail: int,
  106. fail_rate: float,
  107. today_executed_total: int,
  108. today_pending_total: int,
  109. ) -> Dict:
  110. header_title = "票圈供给-AIGC选题解构任务小时统计"
  111. header_subtitle = (
  112. f"{window_start.strftime('%Y-%m-%d %H:%M')} - {window_end.strftime('%H:%M')}"
  113. )
  114. level_tag = "异常告警" if is_alert else "运行正常"
  115. template = "red" if is_alert else "blue"
  116. status_line = "⚠️ 当前失败率已超过阈值,请尽快排查" if is_alert else "✅ 当前任务运行正常"
  117. markdown_lines = [
  118. status_line,
  119. f"**发起数**:{total}",
  120. f"**成功数**:{success}",
  121. f"**执行中数**:{pending + running}",
  122. f"**失败数**:{fail}",
  123. f"**失败率**:{fail_rate:.2%}",
  124. f"**当日已执行总数**:{today_executed_total}",
  125. f"**当日待执行总数**:{today_pending_total}",
  126. ]
  127. body_elements = [
  128. {
  129. "tag": "markdown",
  130. "content": "\n".join(markdown_lines),
  131. "text_align": "left",
  132. }
  133. ]
  134. if is_alert:
  135. body_elements.insert(
  136. 0,
  137. {
  138. "tag": "markdown",
  139. "content": "<at id=all></at>",
  140. "text_align": "left",
  141. },
  142. )
  143. return {
  144. "schema": "2.0",
  145. "config": {
  146. "enable_forward": True,
  147. "update_multi": True,
  148. "width_mode": "compact",
  149. },
  150. "header": {
  151. "title": {
  152. "tag": "plain_text",
  153. "content": header_title,
  154. },
  155. "subtitle": {
  156. "tag": "plain_text",
  157. "content": header_subtitle,
  158. },
  159. "text_tag_list": [
  160. {
  161. "tag": "text_tag",
  162. "text": {
  163. "tag": "plain_text",
  164. "content": level_tag,
  165. },
  166. "color": "red" if is_alert else "green",
  167. }
  168. ],
  169. "template": template,
  170. "padding": "12px 12px 12px 12px",
  171. },
  172. "body": {
  173. "direction": "vertical",
  174. "padding": "12px 12px 12px 12px",
  175. "horizontal_align": "left",
  176. "vertical_spacing": "8px",
  177. "elements": body_elements,
  178. },
  179. }
  180. def _send_feishu_card(card: Dict) -> None:
  181. timestamp = str(int(time.time()))
  182. sign = _gen_feishu_sign(timestamp, _FEISHU_SIGN_SECRET)
  183. payload = {
  184. "timestamp": timestamp,
  185. "sign": sign,
  186. "msg_type": "interactive",
  187. "card": card,
  188. }
  189. resp = requests.post(_FEISHU_WEBHOOK, json=payload, timeout=10)
  190. if resp.status_code != 200:
  191. raise RuntimeError(f"send_feishu_http_status_{resp.status_code}")
  192. body = resp.json()
  193. if body.get("code") != 0:
  194. raise RuntimeError(f"send_feishu_failed code={body.get('code')} msg={body.get('msg')}")
  195. def run_decode_hourly_stats_job() -> None:
  196. now = datetime.now(_TZ)
  197. window_start, window_end = _previous_hour_window(now)
  198. logger.info(
  199. "解构小时统计任务开始,统计窗口=[{}, {})",
  200. window_start.strftime("%Y-%m-%d %H:%M:%S"),
  201. window_end.strftime("%Y-%m-%d %H:%M:%S"),
  202. )
  203. try:
  204. stats = _fetch_hourly_stats(window_start, window_end)
  205. total = stats["total"]
  206. success = stats["success"]
  207. pending = stats["pending"]
  208. running = stats["running"]
  209. fail = stats["fail"]
  210. executed_in_window = success + running + fail
  211. if executed_in_window <= 0:
  212. logger.info(
  213. "当前统计窗口无已执行任务,跳过推送 window=[{}, {}) total={} pending={}",
  214. window_start.strftime("%Y-%m-%d %H:%M:%S"),
  215. window_end.strftime("%Y-%m-%d %H:%M:%S"),
  216. total,
  217. pending,
  218. )
  219. return
  220. fail_rate = (fail / total) if total > 0 else 0.0
  221. success_rate = (success / total) if total > 0 else 0.0
  222. today_stats = _fetch_today_stats(now)
  223. today_executed_total = today_stats["executed_total"]
  224. today_pending_total = today_stats["pending_total"]
  225. has_overdue_pending_task = _has_overdue_pending_task(now)
  226. is_alert = total > 0 and (
  227. fail_rate > _ALERT_FAIL_RATE
  228. or (success_rate < _ALERT_SUCCESS_RATE and has_overdue_pending_task)
  229. )
  230. card = _build_feishu_card(
  231. is_alert=is_alert,
  232. window_start=window_start,
  233. window_end=window_end,
  234. total=total,
  235. success=success,
  236. pending=pending,
  237. running=running,
  238. fail=fail,
  239. fail_rate=fail_rate,
  240. today_executed_total=today_executed_total,
  241. today_pending_total=today_pending_total,
  242. )
  243. _send_feishu_card(card)
  244. logger.info(
  245. "解构小时统计推送完成 total={} success={} pending={} running={} fail={} fail_rate={:.2%} success_rate={:.2%} has_overdue_pending_task={} is_alert={}",
  246. total,
  247. success,
  248. pending,
  249. running,
  250. fail,
  251. fail_rate,
  252. success_rate,
  253. has_overdue_pending_task,
  254. is_alert,
  255. )
  256. except Exception as exc:
  257. logger.exception("解构小时统计任务失败: {}", exc)
  258. if __name__ == "__main__":
  259. run_decode_hourly_stats_job()