demand_pool_strategy_daily_alert.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import json
  2. import re
  3. import ssl
  4. import urllib.error
  5. import urllib.request
  6. from datetime import datetime
  7. from zoneinfo import ZoneInfo
  8. from app.core.config import settings
  9. from app.odps.client import get_odps_client
  10. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  11. DATE_PARTITION_RE = re.compile(r"^\d{8}$")
  12. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  13. def _safe_table_identifier(name: str) -> str:
  14. if not IDENTIFIER_RE.match(name):
  15. raise ValueError(f"invalid sql identifier: {name}")
  16. return name
  17. def _today_partition_dt() -> str:
  18. return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
  19. def fetch_strategy_counts(partition_dt: str) -> list[tuple[str, int]]:
  20. if not DATE_PARTITION_RE.match(partition_dt):
  21. raise ValueError("partition_dt must be yyyymmdd")
  22. table = _safe_table_identifier(settings.demand_pool_source_table)
  23. sql = f"""
  24. SELECT strategy, COUNT(1) AS cnt
  25. FROM {table}
  26. WHERE dt = '{partition_dt}'
  27. GROUP BY strategy
  28. ORDER BY strategy ASC
  29. """
  30. odps_client = get_odps_client()
  31. instance = odps_client.execute_sql(sql)
  32. rows: list[tuple[str, int]] = []
  33. with instance.open_reader(tunnel=True) as reader:
  34. for record in reader:
  35. strategy_label = record["strategy"]
  36. strategy_text = (
  37. "(null)" if strategy_label is None else str(strategy_label).strip() or "(empty)"
  38. )
  39. cnt_raw = record["cnt"]
  40. cnt = int(cnt_raw) if cnt_raw is not None else 0
  41. rows.append((strategy_text, cnt))
  42. return rows
  43. def _feishu_https_context() -> ssl.SSLContext | None:
  44. if settings.feishu_webhook_verify_ssl:
  45. return None
  46. ctx = ssl.create_default_context()
  47. ctx.check_hostname = False
  48. ctx.verify_mode = ssl.CERT_NONE
  49. return ctx
  50. def _sanitize_markdown_table_cell(value: str) -> str:
  51. """避免单元格里的 |、换行把 Markdown 表格弄坏。"""
  52. cleaned = value.replace("\r", " ").replace("\n", " ").strip()
  53. cleaned = cleaned.replace("|", "|")
  54. return cleaned or " "
  55. def _markdown_alert_body(rows: list[tuple[str, int]]) -> str:
  56. lines = [
  57. f"**表**:`{settings.odps_project}.{settings.demand_pool_source_table}`",
  58. "",
  59. ]
  60. total = sum(count for _, count in rows)
  61. if not rows:
  62. lines.append("当前分区无数据。")
  63. else:
  64. lines.extend(
  65. [
  66. "| 策略名称 | 数量 |",
  67. "| :--- | :--- |",
  68. ]
  69. )
  70. for strategy_label, count in rows:
  71. cell = _sanitize_markdown_table_cell(strategy_label)
  72. lines.append(f"| {cell} | {count} |")
  73. lines.extend(["", f"**合计行数**:{total}"])
  74. return "\n".join(lines)
  75. def _feishu_interactive_card_payload(partition_dt: str, rows: list[tuple[str, int]]) -> dict[str, object]:
  76. md = _markdown_alert_body(rows)
  77. return {
  78. "msg_type": "interactive",
  79. "card": {
  80. "schema": "2.0",
  81. "config": {"update_multi": True},
  82. "header": {
  83. "title": {"tag": "plain_text", "content": "需求池策略统计"},
  84. "subtitle": {
  85. "tag": "plain_text",
  86. "content": f"分区 {partition_dt}",
  87. },
  88. "template": "blue",
  89. "padding": "12px 12px 12px 12px",
  90. },
  91. "body": {
  92. "direction": "vertical",
  93. "padding": "12px 12px 12px 12px",
  94. "elements": [
  95. {
  96. "tag": "markdown",
  97. "content": md,
  98. "text_align": "left",
  99. "text_size": "normal_v2",
  100. "margin": "0px 0px 0px 0px",
  101. }
  102. ],
  103. },
  104. },
  105. }
  106. def _send_feishu_webhook(webhook_url: str, payload: dict[str, object]) -> None:
  107. body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
  108. request_obj = urllib.request.Request(
  109. webhook_url,
  110. data=body_bytes,
  111. headers={"Content-Type": "application/json"},
  112. method="POST",
  113. )
  114. ssl_context = _feishu_https_context()
  115. try:
  116. with urllib.request.urlopen(
  117. request_obj,
  118. timeout=settings.feishu_webhook_timeout_seconds,
  119. context=ssl_context,
  120. ) as resp:
  121. raw = resp.read().decode("utf-8")
  122. except urllib.error.HTTPError as exc:
  123. detail = exc.read().decode("utf-8", errors="replace")
  124. raise RuntimeError(f"feishu webhook http error: {exc.code} {detail}") from exc
  125. except urllib.error.URLError as exc:
  126. raise RuntimeError(f"feishu webhook url error: {exc}") from exc
  127. try:
  128. body = json.loads(raw) if raw else {}
  129. except json.JSONDecodeError as exc:
  130. raise RuntimeError(f"feishu webhook invalid json: {raw!r}") from exc
  131. code = body.get("code")
  132. if code is not None and int(code) != 0:
  133. raise RuntimeError(f"feishu webhook api error: {body}")
  134. status_code = body.get("StatusCode")
  135. if status_code is not None and int(status_code) != 0:
  136. raise RuntimeError(f"feishu webhook status error: {body}")
  137. def run_daily_strategy_alert(
  138. partition_dt: str | None = None,
  139. *,
  140. dry_run: bool = False,
  141. ) -> dict[str, object]:
  142. dt_value = partition_dt or _today_partition_dt()
  143. rows = fetch_strategy_counts(dt_value)
  144. markdown_preview = _markdown_alert_body(rows)
  145. if dry_run:
  146. print(markdown_preview)
  147. return {
  148. "partition_dt": dt_value,
  149. "strategy_buckets": len(rows),
  150. "dry_run": True,
  151. }
  152. if not settings.demand_pool_daily_strategy_alert_enabled:
  153. return {"skipped": True, "reason": "disabled"}
  154. webhook = (settings.feishu_webhook_url or "").strip()
  155. if not webhook:
  156. print("[demand_pool_daily_alert] feishu_webhook_url empty, skip")
  157. return {"skipped": True, "reason": "no_webhook"}
  158. payload = _feishu_interactive_card_payload(dt_value, rows)
  159. _send_feishu_webhook(webhook, payload)
  160. return {"partition_dt": dt_value, "strategy_buckets": len(rows), "sent": True}