| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import json
- import re
- import ssl
- import time
- import urllib.error
- import urllib.request
- from datetime import datetime
- from zoneinfo import ZoneInfo
- from app.core.config import settings
- from app.odps.client import get_odps_client
- IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
- DATE_PARTITION_RE = re.compile(r"^\d{8}$")
- SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
- # 飞书开放平台对 bot / 卡片等接口有频控;返回 code=11232 时可稍后重试
- _FEISHU_API_FREQUENCY_LIMIT = 11232
- _FEISHU_WEBHOOK_MAX_ATTEMPTS = 5
- _FEISHU_WEBHOOK_RETRY_BASE_SECONDS = 15.0
- def _safe_table_identifier(name: str) -> str:
- if not IDENTIFIER_RE.match(name):
- raise ValueError(f"invalid sql identifier: {name}")
- return name
- def _today_partition_dt() -> str:
- return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
- def fetch_strategy_counts(partition_dt: str) -> list[tuple[str, int]]:
- if not DATE_PARTITION_RE.match(partition_dt):
- raise ValueError("partition_dt must be yyyymmdd")
- table = _safe_table_identifier(settings.demand_pool_source_table)
- sql = f"""
- SELECT strategy, COUNT(1) AS cnt
- FROM {table}
- WHERE dt = '{partition_dt}'
- GROUP BY strategy
- ORDER BY strategy ASC
- """
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- rows: list[tuple[str, int]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- strategy_label = record["strategy"]
- strategy_text = (
- "(null)" if strategy_label is None else str(strategy_label).strip() or "(empty)"
- )
- cnt_raw = record["cnt"]
- cnt = int(cnt_raw) if cnt_raw is not None else 0
- rows.append((strategy_text, cnt))
- return rows
- def _feishu_https_context() -> ssl.SSLContext | None:
- if settings.feishu_webhook_verify_ssl:
- return None
- ctx = ssl.create_default_context()
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
- return ctx
- def _sanitize_markdown_table_cell(value: str) -> str:
- """避免单元格里的 |、换行把 Markdown 表格弄坏。"""
- cleaned = value.replace("\r", " ").replace("\n", " ").strip()
- cleaned = cleaned.replace("|", "|")
- return cleaned or " "
- def _markdown_alert_body(rows: list[tuple[str, int]]) -> str:
- lines = [
- f"**表**:`{settings.odps_project}.{settings.demand_pool_source_table}`",
- "",
- ]
- total = sum(count for _, count in rows)
- if not rows:
- lines.append("当前分区无数据。")
- else:
- lines.extend(
- [
- "| 策略名称 | 数量 |",
- "| :--- | :--- |",
- ]
- )
- for strategy_label, count in rows:
- cell = _sanitize_markdown_table_cell(strategy_label)
- lines.append(f"| {cell} | {count} |")
- lines.extend(["", f"**合计行数**:{total}"])
- return "\n".join(lines)
- def _feishu_interactive_card_payload(partition_dt: str, rows: list[tuple[str, int]]) -> dict[str, object]:
- md = _markdown_alert_body(rows)
- return {
- "msg_type": "interactive",
- "card": {
- "schema": "2.0",
- "config": {"update_multi": True},
- "header": {
- "title": {"tag": "plain_text", "content": "需求池策略统计"},
- "subtitle": {
- "tag": "plain_text",
- "content": f"分区 {partition_dt}",
- },
- "template": "blue",
- "padding": "12px 12px 12px 12px",
- },
- "body": {
- "direction": "vertical",
- "padding": "12px 12px 12px 12px",
- "elements": [
- {
- "tag": "markdown",
- "content": md,
- "text_align": "left",
- "text_size": "normal_v2",
- "margin": "0px 0px 0px 0px",
- }
- ],
- },
- },
- }
- def _send_feishu_webhook(webhook_url: str, payload: dict[str, object]) -> None:
- body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- ssl_context = _feishu_https_context()
- for attempt in range(_FEISHU_WEBHOOK_MAX_ATTEMPTS):
- request_obj = urllib.request.Request(
- webhook_url,
- data=body_bytes,
- headers={"Content-Type": "application/json"},
- method="POST",
- )
- try:
- with urllib.request.urlopen(
- request_obj,
- timeout=settings.feishu_webhook_timeout_seconds,
- context=ssl_context,
- ) as resp:
- raw = resp.read().decode("utf-8")
- except urllib.error.HTTPError as exc:
- detail = exc.read().decode("utf-8", errors="replace")
- raise RuntimeError(f"feishu webhook http error: {exc.code} {detail}") from exc
- except urllib.error.URLError as exc:
- raise RuntimeError(f"feishu webhook url error: {exc}") from exc
- try:
- body = json.loads(raw) if raw else {}
- except json.JSONDecodeError as exc:
- raise RuntimeError(f"feishu webhook invalid json: {raw!r}") from exc
- code = body.get("code")
- if code is not None:
- code_int = int(code)
- if (
- code_int == _FEISHU_API_FREQUENCY_LIMIT
- and attempt < _FEISHU_WEBHOOK_MAX_ATTEMPTS - 1
- ):
- delay = _FEISHU_WEBHOOK_RETRY_BASE_SECONDS * (2**attempt)
- print(
- "[demand_pool_daily_alert] feishu frequency limited (11232), "
- f"sleep {delay:.0f}s then retry ({attempt + 1}/{_FEISHU_WEBHOOK_MAX_ATTEMPTS})"
- )
- time.sleep(delay)
- continue
- if code_int != 0:
- raise RuntimeError(f"feishu webhook api error: {body}")
- status_code = body.get("StatusCode")
- if status_code is not None and int(status_code) != 0:
- raise RuntimeError(f"feishu webhook status error: {body}")
- return
- def run_daily_strategy_alert(
- partition_dt: str | None = None,
- *,
- dry_run: bool = False,
- ) -> dict[str, object]:
- dt_value = partition_dt or _today_partition_dt()
- rows = fetch_strategy_counts(dt_value)
- markdown_preview = _markdown_alert_body(rows)
- if dry_run:
- print(markdown_preview)
- return {
- "partition_dt": dt_value,
- "strategy_buckets": len(rows),
- "dry_run": True,
- }
- if not settings.demand_pool_daily_strategy_alert_enabled:
- return {"skipped": True, "reason": "disabled"}
- webhook = (settings.feishu_webhook_url or "").strip()
- if not webhook:
- print("[demand_pool_daily_alert] feishu_webhook_url empty, skip")
- return {"skipped": True, "reason": "no_webhook"}
- payload = _feishu_interactive_card_payload(dt_value, rows)
- _send_feishu_webhook(webhook, payload)
- return {"partition_dt": dt_value, "strategy_buckets": len(rows), "sent": True}
|