import json import hashlib import re from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from zoneinfo import ZoneInfo from sqlalchemy import text from app.core.config import settings from app.db.mysql import SessionLocal from app.odps.client import get_odps_client IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") BATCH_SIZE = 500 SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") def _safe_identifier(name: str) -> str: if not IDENTIFIER_RE.match(name): raise ValueError(f"invalid sql identifier: {name}") return name def _serialize_video_list(value: object) -> str | None: if value is None: return None if isinstance(value, list): return json.dumps(value, ensure_ascii=False) return str(value) def _normalize_secondary_weight(value: object) -> float | None: if value is None: return None decimal_value = Decimal(str(value)).quantize( Decimal("0.000001"), rounding=ROUND_HALF_UP, ) return float(decimal_value) def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[str, object]]: source_table = _safe_identifier(settings.demand_pool_source_table) sql = f""" SELECT strategy, demand_id, demand_name, weight, video_count, video_list, ext_info FROM {source_table} WHERE dt = '{partition_dt}' """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) dedup_rows: dict[str, dict[str, object]] = {} with instance.open_reader(tunnel=True) as reader: for record in reader: demand_id = str(record["demand_id"] or "").strip() if not demand_id: continue dedup_rows[demand_id] = { "strategy": record["strategy"], "demand_id": demand_id, "demand_name": record["demand_name"], "weight": record["weight"], "video_count": record["video_count"], "video_list": _serialize_video_list(record["video_list"]), "ext_info": record["ext_info"], "dt": partition_dt, } return list(dedup_rows.values()) def _build_secondary_demand_id(demand_name: str, partition_dt: str) -> str: raw_value = f"{settings.demand_pool_secondary_strategy}{demand_name}{partition_dt}" return hashlib.md5(raw_value.encode("utf-8")).hexdigest() def _fetch_partition_rows_from_secondary_source(partition_dt: str) -> list[dict[str, object]]: source_table = _safe_identifier(settings.demand_pool_secondary_source_table) sql = f""" SELECT demand, score FROM {source_table} WHERE dt = '{partition_dt}' """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) dedup_rows: dict[str, dict[str, object]] = {} with instance.open_reader(tunnel=True) as reader: for record in reader: demand_name = str(record["demand"] or "").strip() if not demand_name: continue demand_id = _build_secondary_demand_id(demand_name, partition_dt) dedup_rows[demand_id] = { "strategy": settings.demand_pool_secondary_strategy, "demand_id": demand_id, "demand_name": demand_name, "weight": _normalize_secondary_weight(record["score"]), "video_count": None, "video_list": None, "ext_info": settings.demand_pool_secondary_default_ext_info, "dt": partition_dt, } return list(dedup_rows.values()) def _ensure_target_table() -> None: target_table = _safe_identifier(settings.demand_pool_target_table) create_sql = f""" CREATE TABLE IF NOT EXISTS {target_table} ( id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY, strategy VARCHAR(64) NULL COMMENT '策略', demand_id VARCHAR(64) NULL COMMENT '需求id', demand_name VARCHAR(64) NULL COMMENT '需求', weight DOUBLE NULL COMMENT '权重', video_count BIGINT NULL COMMENT '视频数量', video_list TEXT NULL COMMENT '视频列表', ext_info TEXT NULL COMMENT '扩展字段', dt VARCHAR(32) NULL COMMENT '分区日期', create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uniq_demand_id (demand_id) ) """ with SessionLocal() as session: session.execute(text(create_sql)) session.commit() def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int: if not rows: return 0 target_table = _safe_identifier(settings.demand_pool_target_table) upsert_sql = text( f""" INSERT INTO {target_table} ( strategy, demand_id, demand_name, weight, video_count, video_list, ext_info, dt ) VALUES ( :strategy, :demand_id, :demand_name, :weight, :video_count, :video_list, :ext_info, :dt ) ON DUPLICATE KEY UPDATE strategy = VALUES(strategy), demand_name = VALUES(demand_name), weight = VALUES(weight), video_count = VALUES(video_count), video_list = VALUES(video_list), ext_info = VALUES(ext_info), dt = VALUES(dt), update_time = IF( NOT ( strategy <=> VALUES(strategy) AND demand_name <=> VALUES(demand_name) AND weight <=> VALUES(weight) AND video_count <=> VALUES(video_count) AND video_list <=> VALUES(video_list) AND ext_info <=> VALUES(ext_info) AND dt <=> VALUES(dt) ), CURRENT_TIMESTAMP, update_time ) """ ) with SessionLocal() as session: for start in range(0, len(rows), BATCH_SIZE): session.execute(upsert_sql, rows[start : start + BATCH_SIZE]) session.commit() return len(rows) def sync_partition(partition_dt: str) -> int: merged_rows: dict[str, dict[str, object]] = {} for row in _fetch_partition_rows_from_primary_source(partition_dt): merged_rows[str(row["demand_id"])] = row for row in _fetch_partition_rows_from_secondary_source(partition_dt): merged_rows[str(row["demand_id"])] = row return _upsert_rows_by_demand_id(list(merged_rows.values())) def run_full_sync(partitions: list[str] | None = None) -> dict[str, int]: _ensure_target_table() partition_list = partitions or settings.demand_pool_initial_partition_list result: dict[str, int] = {} for partition in partition_list: result[partition] = sync_partition(partition) return result def run_today_incremental_sync() -> dict[str, int]: _ensure_target_table() partition_dt = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d") return {partition_dt: sync_partition(partition_dt)}