import json import re import ssl import time import urllib.error import urllib.request from datetime import datetime from zoneinfo import ZoneInfo from app.core.config import settings from app.odps.client import get_odps_client IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") DATE_PARTITION_RE = re.compile(r"^\d{8}$") SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") # 飞书开放平台对 bot / 卡片等接口有频控;返回 code=11232 时可稍后重试 _FEISHU_API_FREQUENCY_LIMIT = 11232 _FEISHU_WEBHOOK_MAX_ATTEMPTS = 5 _FEISHU_WEBHOOK_RETRY_BASE_SECONDS = 15.0 def _safe_table_identifier(name: str) -> str: if not IDENTIFIER_RE.match(name): raise ValueError(f"invalid sql identifier: {name}") return name def _today_partition_dt() -> str: return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d") def fetch_strategy_counts(partition_dt: str) -> list[tuple[str, int]]: if not DATE_PARTITION_RE.match(partition_dt): raise ValueError("partition_dt must be yyyymmdd") table = _safe_table_identifier(settings.demand_pool_source_table) sql = f""" SELECT strategy, COUNT(1) AS cnt FROM {table} WHERE dt = '{partition_dt}' GROUP BY strategy ORDER BY strategy ASC """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) rows: list[tuple[str, int]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: strategy_label = record["strategy"] strategy_text = ( "(null)" if strategy_label is None else str(strategy_label).strip() or "(empty)" ) cnt_raw = record["cnt"] cnt = int(cnt_raw) if cnt_raw is not None else 0 rows.append((strategy_text, cnt)) return rows def _feishu_https_context() -> ssl.SSLContext | None: if settings.feishu_webhook_verify_ssl: return None ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def _sanitize_markdown_table_cell(value: str) -> str: """避免单元格里的 |、换行把 Markdown 表格弄坏。""" cleaned = value.replace("\r", " ").replace("\n", " ").strip() cleaned = cleaned.replace("|", "|") return cleaned or " " def _markdown_alert_body(rows: list[tuple[str, int]]) -> str: lines = [ f"**表**:`{settings.odps_project}.{settings.demand_pool_source_table}`", "", ] total = sum(count for _, count in rows) if not rows: lines.append("当前分区无数据。") else: lines.extend( [ "| 策略名称 | 数量 |", "| :--- | :--- |", ] ) for strategy_label, count in rows: cell = _sanitize_markdown_table_cell(strategy_label) lines.append(f"| {cell} | {count} |") lines.extend(["", f"**合计行数**:{total}"]) return "\n".join(lines) def _feishu_interactive_card_payload(partition_dt: str, rows: list[tuple[str, int]]) -> dict[str, object]: md = _markdown_alert_body(rows) return { "msg_type": "interactive", "card": { "schema": "2.0", "config": {"update_multi": True}, "header": { "title": {"tag": "plain_text", "content": "需求池策略统计"}, "subtitle": { "tag": "plain_text", "content": f"分区 {partition_dt}", }, "template": "blue", "padding": "12px 12px 12px 12px", }, "body": { "direction": "vertical", "padding": "12px 12px 12px 12px", "elements": [ { "tag": "markdown", "content": md, "text_align": "left", "text_size": "normal_v2", "margin": "0px 0px 0px 0px", } ], }, }, } def _send_feishu_webhook(webhook_url: str, payload: dict[str, object]) -> None: body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8") ssl_context = _feishu_https_context() for attempt in range(_FEISHU_WEBHOOK_MAX_ATTEMPTS): request_obj = urllib.request.Request( webhook_url, data=body_bytes, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen( request_obj, timeout=settings.feishu_webhook_timeout_seconds, context=ssl_context, ) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"feishu webhook http error: {exc.code} {detail}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"feishu webhook url error: {exc}") from exc try: body = json.loads(raw) if raw else {} except json.JSONDecodeError as exc: raise RuntimeError(f"feishu webhook invalid json: {raw!r}") from exc code = body.get("code") if code is not None: code_int = int(code) if ( code_int == _FEISHU_API_FREQUENCY_LIMIT and attempt < _FEISHU_WEBHOOK_MAX_ATTEMPTS - 1 ): delay = _FEISHU_WEBHOOK_RETRY_BASE_SECONDS * (2**attempt) print( "[demand_pool_daily_alert] feishu frequency limited (11232), " f"sleep {delay:.0f}s then retry ({attempt + 1}/{_FEISHU_WEBHOOK_MAX_ATTEMPTS})" ) time.sleep(delay) continue if code_int != 0: raise RuntimeError(f"feishu webhook api error: {body}") status_code = body.get("StatusCode") if status_code is not None and int(status_code) != 0: raise RuntimeError(f"feishu webhook status error: {body}") return def run_daily_strategy_alert( partition_dt: str | None = None, *, dry_run: bool = False, ) -> dict[str, object]: dt_value = partition_dt or _today_partition_dt() rows = fetch_strategy_counts(dt_value) markdown_preview = _markdown_alert_body(rows) if dry_run: print(markdown_preview) return { "partition_dt": dt_value, "strategy_buckets": len(rows), "dry_run": True, } if not settings.demand_pool_daily_strategy_alert_enabled: return {"skipped": True, "reason": "disabled"} webhook = (settings.feishu_webhook_url or "").strip() if not webhook: print("[demand_pool_daily_alert] feishu_webhook_url empty, skip") return {"skipped": True, "reason": "no_webhook"} payload = _feishu_interactive_card_payload(dt_value, rows) _send_feishu_webhook(webhook, payload) return {"partition_dt": dt_value, "strategy_buckets": len(rows), "sent": True}