demand_pool_strategy_daily_alert.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import json
  2. import re
  3. import ssl
  4. import time
  5. import urllib.error
  6. import urllib.request
  7. from datetime import datetime
  8. from zoneinfo import ZoneInfo
  9. from app.core.config import settings
  10. from app.odps.client import get_odps_client
  11. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  12. DATE_PARTITION_RE = re.compile(r"^\d{8}$")
  13. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  14. # 飞书开放平台对 bot / 卡片等接口有频控;返回 code=11232 时可稍后重试
  15. _FEISHU_API_FREQUENCY_LIMIT = 11232
  16. _FEISHU_WEBHOOK_MAX_ATTEMPTS = 5
  17. _FEISHU_WEBHOOK_RETRY_BASE_SECONDS = 15.0
  18. def _safe_table_identifier(name: str) -> str:
  19. if not IDENTIFIER_RE.match(name):
  20. raise ValueError(f"invalid sql identifier: {name}")
  21. return name
  22. def _today_partition_dt() -> str:
  23. return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
  24. def fetch_strategy_counts(partition_dt: str) -> list[tuple[str, int]]:
  25. if not DATE_PARTITION_RE.match(partition_dt):
  26. raise ValueError("partition_dt must be yyyymmdd")
  27. table = _safe_table_identifier(settings.demand_pool_source_table)
  28. sql = f"""
  29. SELECT strategy, COUNT(1) AS cnt
  30. FROM {table}
  31. WHERE dt = '{partition_dt}'
  32. GROUP BY strategy
  33. ORDER BY strategy ASC
  34. """
  35. odps_client = get_odps_client()
  36. instance = odps_client.execute_sql(sql)
  37. rows: list[tuple[str, int]] = []
  38. with instance.open_reader(tunnel=True) as reader:
  39. for record in reader:
  40. strategy_label = record["strategy"]
  41. strategy_text = (
  42. "(null)" if strategy_label is None else str(strategy_label).strip() or "(empty)"
  43. )
  44. cnt_raw = record["cnt"]
  45. cnt = int(cnt_raw) if cnt_raw is not None else 0
  46. rows.append((strategy_text, cnt))
  47. return rows
  48. def _feishu_https_context() -> ssl.SSLContext | None:
  49. if settings.feishu_webhook_verify_ssl:
  50. return None
  51. ctx = ssl.create_default_context()
  52. ctx.check_hostname = False
  53. ctx.verify_mode = ssl.CERT_NONE
  54. return ctx
  55. def _sanitize_markdown_table_cell(value: str) -> str:
  56. """避免单元格里的 |、换行把 Markdown 表格弄坏。"""
  57. cleaned = value.replace("\r", " ").replace("\n", " ").strip()
  58. cleaned = cleaned.replace("|", "|")
  59. return cleaned or " "
  60. def _markdown_alert_body(rows: list[tuple[str, int]]) -> str:
  61. lines = [
  62. f"**表**:`{settings.odps_project}.{settings.demand_pool_source_table}`",
  63. "",
  64. ]
  65. total = sum(count for _, count in rows)
  66. if not rows:
  67. lines.append("当前分区无数据。")
  68. else:
  69. lines.extend(
  70. [
  71. "| 策略名称 | 数量 |",
  72. "| :--- | :--- |",
  73. ]
  74. )
  75. for strategy_label, count in rows:
  76. cell = _sanitize_markdown_table_cell(strategy_label)
  77. lines.append(f"| {cell} | {count} |")
  78. lines.extend(["", f"**合计行数**:{total}"])
  79. return "\n".join(lines)
  80. def _feishu_interactive_card_payload(partition_dt: str, rows: list[tuple[str, int]]) -> dict[str, object]:
  81. md = _markdown_alert_body(rows)
  82. return {
  83. "msg_type": "interactive",
  84. "card": {
  85. "schema": "2.0",
  86. "config": {"update_multi": True},
  87. "header": {
  88. "title": {"tag": "plain_text", "content": "需求池策略统计"},
  89. "subtitle": {
  90. "tag": "plain_text",
  91. "content": f"分区 {partition_dt}",
  92. },
  93. "template": "blue",
  94. "padding": "12px 12px 12px 12px",
  95. },
  96. "body": {
  97. "direction": "vertical",
  98. "padding": "12px 12px 12px 12px",
  99. "elements": [
  100. {
  101. "tag": "markdown",
  102. "content": md,
  103. "text_align": "left",
  104. "text_size": "normal_v2",
  105. "margin": "0px 0px 0px 0px",
  106. }
  107. ],
  108. },
  109. },
  110. }
  111. def _send_feishu_webhook(webhook_url: str, payload: dict[str, object]) -> None:
  112. body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
  113. ssl_context = _feishu_https_context()
  114. for attempt in range(_FEISHU_WEBHOOK_MAX_ATTEMPTS):
  115. request_obj = urllib.request.Request(
  116. webhook_url,
  117. data=body_bytes,
  118. headers={"Content-Type": "application/json"},
  119. method="POST",
  120. )
  121. try:
  122. with urllib.request.urlopen(
  123. request_obj,
  124. timeout=settings.feishu_webhook_timeout_seconds,
  125. context=ssl_context,
  126. ) as resp:
  127. raw = resp.read().decode("utf-8")
  128. except urllib.error.HTTPError as exc:
  129. detail = exc.read().decode("utf-8", errors="replace")
  130. raise RuntimeError(f"feishu webhook http error: {exc.code} {detail}") from exc
  131. except urllib.error.URLError as exc:
  132. raise RuntimeError(f"feishu webhook url error: {exc}") from exc
  133. try:
  134. body = json.loads(raw) if raw else {}
  135. except json.JSONDecodeError as exc:
  136. raise RuntimeError(f"feishu webhook invalid json: {raw!r}") from exc
  137. code = body.get("code")
  138. if code is not None:
  139. code_int = int(code)
  140. if (
  141. code_int == _FEISHU_API_FREQUENCY_LIMIT
  142. and attempt < _FEISHU_WEBHOOK_MAX_ATTEMPTS - 1
  143. ):
  144. delay = _FEISHU_WEBHOOK_RETRY_BASE_SECONDS * (2**attempt)
  145. print(
  146. "[demand_pool_daily_alert] feishu frequency limited (11232), "
  147. f"sleep {delay:.0f}s then retry ({attempt + 1}/{_FEISHU_WEBHOOK_MAX_ATTEMPTS})"
  148. )
  149. time.sleep(delay)
  150. continue
  151. if code_int != 0:
  152. raise RuntimeError(f"feishu webhook api error: {body}")
  153. status_code = body.get("StatusCode")
  154. if status_code is not None and int(status_code) != 0:
  155. raise RuntimeError(f"feishu webhook status error: {body}")
  156. return
  157. def run_daily_strategy_alert(
  158. partition_dt: str | None = None,
  159. *,
  160. dry_run: bool = False,
  161. ) -> dict[str, object]:
  162. dt_value = partition_dt or _today_partition_dt()
  163. rows = fetch_strategy_counts(dt_value)
  164. markdown_preview = _markdown_alert_body(rows)
  165. if dry_run:
  166. print(markdown_preview)
  167. return {
  168. "partition_dt": dt_value,
  169. "strategy_buckets": len(rows),
  170. "dry_run": True,
  171. }
  172. if not settings.demand_pool_daily_strategy_alert_enabled:
  173. return {"skipped": True, "reason": "disabled"}
  174. webhook = (settings.feishu_webhook_url or "").strip()
  175. if not webhook:
  176. print("[demand_pool_daily_alert] feishu_webhook_url empty, skip")
  177. return {"skipped": True, "reason": "no_webhook"}
  178. payload = _feishu_interactive_card_payload(dt_value, rows)
  179. _send_feishu_webhook(webhook, payload)
  180. return {"partition_dt": dt_value, "strategy_buckets": len(rows), "sent": True}