|
|
@@ -0,0 +1,186 @@
|
|
|
+import json
|
|
|
+import re
|
|
|
+import ssl
|
|
|
+import urllib.error
|
|
|
+import urllib.request
|
|
|
+from datetime import datetime
|
|
|
+from zoneinfo import ZoneInfo
|
|
|
+
|
|
|
+from app.core.config import settings
|
|
|
+from app.odps.client import get_odps_client
|
|
|
+
|
|
|
+IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
|
|
+DATE_PARTITION_RE = re.compile(r"^\d{8}$")
|
|
|
+SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
|
|
|
+
|
|
|
+
|
|
|
+def _safe_table_identifier(name: str) -> str:
|
|
|
+ if not IDENTIFIER_RE.match(name):
|
|
|
+ raise ValueError(f"invalid sql identifier: {name}")
|
|
|
+ return name
|
|
|
+
|
|
|
+
|
|
|
+def _today_partition_dt() -> str:
|
|
|
+ return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
|
|
|
+
|
|
|
+
|
|
|
+def fetch_strategy_counts(partition_dt: str) -> list[tuple[str, int]]:
|
|
|
+ if not DATE_PARTITION_RE.match(partition_dt):
|
|
|
+ raise ValueError("partition_dt must be yyyymmdd")
|
|
|
+ table = _safe_table_identifier(settings.demand_pool_source_table)
|
|
|
+ sql = f"""
|
|
|
+ SELECT strategy, COUNT(1) AS cnt
|
|
|
+ FROM {table}
|
|
|
+ WHERE dt = '{partition_dt}'
|
|
|
+ GROUP BY strategy
|
|
|
+ ORDER BY strategy ASC
|
|
|
+ """
|
|
|
+ odps_client = get_odps_client()
|
|
|
+ instance = odps_client.execute_sql(sql)
|
|
|
+ rows: list[tuple[str, int]] = []
|
|
|
+ with instance.open_reader(tunnel=True) as reader:
|
|
|
+ for record in reader:
|
|
|
+ strategy_label = record["strategy"]
|
|
|
+ strategy_text = (
|
|
|
+ "(null)" if strategy_label is None else str(strategy_label).strip() or "(empty)"
|
|
|
+ )
|
|
|
+ cnt_raw = record["cnt"]
|
|
|
+ cnt = int(cnt_raw) if cnt_raw is not None else 0
|
|
|
+ rows.append((strategy_text, cnt))
|
|
|
+ return rows
|
|
|
+
|
|
|
+
|
|
|
+def _feishu_https_context() -> ssl.SSLContext | None:
|
|
|
+ if settings.feishu_webhook_verify_ssl:
|
|
|
+ return None
|
|
|
+ ctx = ssl.create_default_context()
|
|
|
+ ctx.check_hostname = False
|
|
|
+ ctx.verify_mode = ssl.CERT_NONE
|
|
|
+ return ctx
|
|
|
+
|
|
|
+
|
|
|
+def _sanitize_markdown_table_cell(value: str) -> str:
|
|
|
+ """避免单元格里的 |、换行把 Markdown 表格弄坏。"""
|
|
|
+ cleaned = value.replace("\r", " ").replace("\n", " ").strip()
|
|
|
+ cleaned = cleaned.replace("|", "|")
|
|
|
+ return cleaned or " "
|
|
|
+
|
|
|
+
|
|
|
+def _markdown_alert_body(rows: list[tuple[str, int]]) -> str:
|
|
|
+ lines = [
|
|
|
+ f"**表**:`{settings.odps_project}.{settings.demand_pool_source_table}`",
|
|
|
+ "",
|
|
|
+ ]
|
|
|
+ total = sum(count for _, count in rows)
|
|
|
+ if not rows:
|
|
|
+ lines.append("当前分区无数据。")
|
|
|
+ else:
|
|
|
+ lines.extend(
|
|
|
+ [
|
|
|
+ "| 策略名称 | 数量 |",
|
|
|
+ "| :--- | :--- |",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ for strategy_label, count in rows:
|
|
|
+ cell = _sanitize_markdown_table_cell(strategy_label)
|
|
|
+ lines.append(f"| {cell} | {count} |")
|
|
|
+ lines.extend(["", f"**合计行数**:{total}"])
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+
|
|
|
+def _feishu_interactive_card_payload(partition_dt: str, rows: list[tuple[str, int]]) -> dict[str, object]:
|
|
|
+ md = _markdown_alert_body(rows)
|
|
|
+ return {
|
|
|
+ "msg_type": "interactive",
|
|
|
+ "card": {
|
|
|
+ "schema": "2.0",
|
|
|
+ "config": {"update_multi": True},
|
|
|
+ "header": {
|
|
|
+ "title": {"tag": "plain_text", "content": "需求池策略统计"},
|
|
|
+ "subtitle": {
|
|
|
+ "tag": "plain_text",
|
|
|
+ "content": f"分区 {partition_dt}",
|
|
|
+ },
|
|
|
+ "template": "blue",
|
|
|
+ "padding": "12px 12px 12px 12px",
|
|
|
+ },
|
|
|
+ "body": {
|
|
|
+ "direction": "vertical",
|
|
|
+ "padding": "12px 12px 12px 12px",
|
|
|
+ "elements": [
|
|
|
+ {
|
|
|
+ "tag": "markdown",
|
|
|
+ "content": md,
|
|
|
+ "text_align": "left",
|
|
|
+ "text_size": "normal_v2",
|
|
|
+ "margin": "0px 0px 0px 0px",
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def _send_feishu_webhook(webhook_url: str, payload: dict[str, object]) -> None:
|
|
|
+ body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
|
+ request_obj = urllib.request.Request(
|
|
|
+ webhook_url,
|
|
|
+ data=body_bytes,
|
|
|
+ headers={"Content-Type": "application/json"},
|
|
|
+ method="POST",
|
|
|
+ )
|
|
|
+ ssl_context = _feishu_https_context()
|
|
|
+ try:
|
|
|
+ with urllib.request.urlopen(
|
|
|
+ request_obj,
|
|
|
+ timeout=settings.feishu_webhook_timeout_seconds,
|
|
|
+ context=ssl_context,
|
|
|
+ ) as resp:
|
|
|
+ raw = resp.read().decode("utf-8")
|
|
|
+ except urllib.error.HTTPError as exc:
|
|
|
+ detail = exc.read().decode("utf-8", errors="replace")
|
|
|
+ raise RuntimeError(f"feishu webhook http error: {exc.code} {detail}") from exc
|
|
|
+ except urllib.error.URLError as exc:
|
|
|
+ raise RuntimeError(f"feishu webhook url error: {exc}") from exc
|
|
|
+
|
|
|
+ try:
|
|
|
+ body = json.loads(raw) if raw else {}
|
|
|
+ except json.JSONDecodeError as exc:
|
|
|
+ raise RuntimeError(f"feishu webhook invalid json: {raw!r}") from exc
|
|
|
+
|
|
|
+ code = body.get("code")
|
|
|
+ if code is not None and int(code) != 0:
|
|
|
+ raise RuntimeError(f"feishu webhook api error: {body}")
|
|
|
+ status_code = body.get("StatusCode")
|
|
|
+ if status_code is not None and int(status_code) != 0:
|
|
|
+ raise RuntimeError(f"feishu webhook status error: {body}")
|
|
|
+
|
|
|
+
|
|
|
+def run_daily_strategy_alert(
|
|
|
+ partition_dt: str | None = None,
|
|
|
+ *,
|
|
|
+ dry_run: bool = False,
|
|
|
+) -> dict[str, object]:
|
|
|
+ dt_value = partition_dt or _today_partition_dt()
|
|
|
+ rows = fetch_strategy_counts(dt_value)
|
|
|
+ markdown_preview = _markdown_alert_body(rows)
|
|
|
+
|
|
|
+ if dry_run:
|
|
|
+ print(markdown_preview)
|
|
|
+ return {
|
|
|
+ "partition_dt": dt_value,
|
|
|
+ "strategy_buckets": len(rows),
|
|
|
+ "dry_run": True,
|
|
|
+ }
|
|
|
+
|
|
|
+ if not settings.demand_pool_daily_strategy_alert_enabled:
|
|
|
+ return {"skipped": True, "reason": "disabled"}
|
|
|
+
|
|
|
+ webhook = (settings.feishu_webhook_url or "").strip()
|
|
|
+ if not webhook:
|
|
|
+ print("[demand_pool_daily_alert] feishu_webhook_url empty, skip")
|
|
|
+ return {"skipped": True, "reason": "no_webhook"}
|
|
|
+
|
|
|
+ payload = _feishu_interactive_card_payload(dt_value, rows)
|
|
|
+ _send_feishu_webhook(webhook, payload)
|
|
|
+ return {"partition_dt": dt_value, "strategy_buckets": len(rows), "sent": True}
|