"""垂直领域实质元素:从 ODPS 同步基本元素(增量)与效果数据(按日全量)。""" import json import re from datetime import datetime, timedelta from decimal import Decimal from zoneinfo import ZoneInfo from sqlalchemy import bindparam, text from app.core.config import settings from app.db.mysql import SessionLocal from app.odps.client import get_odps_client IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") BATCH_SIZE = 500 SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") _T_ELEMENT_CTE = """ WITH t_element AS ( SELECT t1.post_id AS vid ,t1.global_element_id AS element_id ,t1.element_name AS 元素名称 ,t1.element_type AS 维度 ,t4.classified_as AS 归类 ,t4.name AS 分类名称 ,t4.stable_id AS stable_id ,t4.description AS 分类说明 ,t4.level AS 分类层级 ,t4.path AS 分类路径 ,t1.element_type AS 元素维度 ,t6.topic_point_type AS 点类型 ,t2.contribution AS 贡献度 ,t2.share_will AS 分享意愿度 ,t2.consume_will AS 消费意愿度 ,t2.click_will AS 点击意愿度 FROM loghubods.element_classification_mapping t1 LEFT JOIN loghubods.dwd_post_word_contribution_di t2 ON t1.post_id = t2.post_id AND t1.element_name = t2.word AND t2.dt = '{bizdate}' LEFT JOIN loghubods.global_element t3 ON t1.global_element_id = t3.id AND t3.dt = '{bizdate}' LEFT JOIN loghubods.global_category t4 ON t1.global_category_stable_id = t4.stable_id AND t4.dt = '{bizdate}' LEFT JOIN loghubods.post_decode_topic_point_element t5 ON t1.source_element_id = t5.id AND t1.post_id = t5.post_id AND t5.dt = '{bizdate}' LEFT JOIN loghubods.post_decode_topic_point t6 ON t5.topic_point_id = t6.id AND t6.dt = '{bizdate}' WHERE t1.dt = '{bizdate}' AND LENGTH(CAST(t1.post_id AS STRING)) <= 10 AND t3.retired_at_execution_id IS NULL AND t4.retired_at_execution_id IS NULL AND t5.id IS NOT NULL AND t1.element_type = '实质' AND t4.classified_as = '垂直领域细分' ) """ _T_ELEMENT_BASE_CTE = """ WITH t_element AS ( SELECT t1.global_element_id AS element_id ,t1.element_name AS 元素名称 ,t1.element_type AS 维度 ,t4.classified_as AS 归类 ,t4.stable_id AS stable_id FROM loghubods.element_classification_mapping t1 LEFT JOIN loghubods.global_element t3 ON t1.global_element_id = t3.id AND t3.dt = '{bizdate}' LEFT JOIN loghubods.global_category t4 ON t1.global_category_stable_id = t4.stable_id AND t4.dt = '{bizdate}' LEFT JOIN loghubods.post_decode_topic_point_element t5 ON t1.source_element_id = t5.id AND t1.post_id = t5.post_id AND t5.dt = '{bizdate}' WHERE t1.dt = '{bizdate}' AND LENGTH(CAST(t1.post_id AS STRING)) <= 10 AND t3.retired_at_execution_id IS NULL AND t4.retired_at_execution_id IS NULL AND t5.id IS NOT NULL AND t1.element_type = '实质' AND t4.classified_as = '垂直领域细分' ) """ def _safe_identifier(name: str) -> str: if not IDENTIFIER_RE.match(name): raise ValueError(f"invalid sql identifier: {name}") return name def _serialize_vid_list(value: object) -> str | None: if value is None: return None if isinstance(value, list): return json.dumps(value, ensure_ascii=False) return str(value) def _normalize_scalar(value: object) -> object: if isinstance(value, Decimal): return float(value) return value def _normalize_element_id(value: object) -> str: text_value = str(value or "").strip() if not text_value: raise ValueError("element_id 不能为空") return text_value def _yesterday_partition_dt(reference: datetime | None = None) -> str: now = reference or datetime.now(SHANGHAI_TZ) return (now.date() - timedelta(days=1)).strftime("%Y%m%d") def _build_sync_sql(bizdate: str) -> str: return ( _T_ELEMENT_CTE.format(bizdate=bizdate) + """ SELECT CAST(t1.element_id AS STRING) AS element_id ,t1.元素名称 AS element_name ,t1.维度 AS dimension ,t1.归类 AS classified_as ,CAST(t1.stable_id AS STRING) AS stable_id ,COUNT(DISTINCT t1.vid) AS vid_count ,COLLECT_SET(t1.vid) AS vid_list ,COALESCE(ROUND( SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END) / NULLIF(SUM(t2.分发曝光pv), 0) , 4), 0) AS rov_score ,COALESCE(ROUND( SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END) / NULLIF(SUM(t2.分发曝光pv), 0) , 4), 0) AS str_score ,COALESCE(ROUND( SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END) / NULLIF(SUM(t2.分发分享pv), 0) , 4), 0) AS ros_score FROM t_element t1 LEFT JOIN loghubods.video_multi_strategy_effect t2 ON t1.vid = t2.vid AND t2.dt = '{bizdate}' AND t2.strategy = '当天' GROUP BY t1.element_id ,t1.元素名称 ,t1.维度 ,t1.归类 ,t1.stable_id """ ).format(bizdate=bizdate) def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]: sql = _build_sync_sql(bizdate) odps_client = get_odps_client() instance = odps_client.execute_sql(sql) base_rows: dict[str, dict[str, object]] = {} effect_rows: list[dict[str, object]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: element_id = _normalize_element_id(record["element_id"]) base_rows[element_id] = { "element_id": element_id, "stable_id": str(record["stable_id"] or "").strip() or None, "element_name": record["element_name"], "dimension": record["dimension"], "classified_as": record["classified_as"], } effect_rows.append( { "element_id": element_id, "vid_count": record["vid_count"], "vid_list": _serialize_vid_list(record["vid_list"]), "rov_score": _normalize_scalar(record["rov_score"]), "str_score": _normalize_scalar(record["str_score"]), "ros_score": _normalize_scalar(record["ros_score"]), "dt": bizdate, } ) return list(base_rows.values()), effect_rows def _build_base_fields_sql(bizdate: str) -> str: return ( _T_ELEMENT_BASE_CTE.format(bizdate=bizdate) + """ SELECT DISTINCT CAST(t1.element_id AS STRING) AS element_id ,CAST(t1.stable_id AS STRING) AS stable_id ,t1.元素名称 AS element_name ,t1.维度 AS dimension ,t1.归类 AS classified_as FROM t_element t1 """ ) def _fetch_base_fields_from_odps(bizdate: str) -> list[dict[str, object]]: print(f"[update] querying ODPS base fields, dt={bizdate} ...") sql = _build_base_fields_sql(bizdate) odps_client = get_odps_client() instance = odps_client.execute_sql(sql) rows: dict[str, dict[str, object]] = {} with instance.open_reader(tunnel=True) as reader: for record in reader: element_id = _normalize_element_id(record["element_id"]) rows[element_id] = { "element_id": element_id, "stable_id": str(record["stable_id"] or "").strip() or None, "element_name": record["element_name"], "dimension": record["dimension"], "classified_as": record["classified_as"], } print(f"[update] ODPS done, fetched {len(rows)} rows") return list(rows.values()) def _update_base_stable_id(rows: list[dict[str, object]]) -> int: if not rows: return 0 base_table = _safe_identifier(settings.substance_element_base_table) update_sql = text( f""" UPDATE {base_table} SET stable_id = :stable_id WHERE element_id = :element_id AND NOT (stable_id <=> :stable_id) """ ) updated = 0 with SessionLocal() as session: for start in range(0, len(rows), BATCH_SIZE): batch = rows[start : start + BATCH_SIZE] result = session.execute(update_sql, batch) updated += int(result.rowcount or 0) session.commit() return updated def truncate_substance_element_base() -> None: base_table = _safe_identifier(settings.substance_element_base_table) with SessionLocal() as session: session.execute(text(f"TRUNCATE TABLE {base_table}")) session.commit() print(f"[sync] truncated {base_table}") def sync_substance_element_base_only(partition_dt: str | None = None) -> dict[str, int | str]: """仅从 ODPS 全量写入 substance_element_base,不重写 effect_di 表。""" _ensure_tables() bizdate = partition_dt or _yesterday_partition_dt() base_rows = _fetch_base_fields_from_odps(bizdate) print(f"[sync] inserting {len(base_rows)} rows into base table ...") inserted_count = _insert_base_elements(base_rows) return { "partition_dt": bizdate, "base_fetched": len(base_rows), "base_inserted": inserted_count, } def update_substance_element_base_fields(partition_dt: str | None = None) -> dict[str, int | str]: """仅回填 substance_element_base.stable_id,不重写 effect_di 表。""" bizdate = partition_dt or _yesterday_partition_dt() base_rows = _fetch_base_fields_from_odps(bizdate) print(f"[update] matching existing rows in MySQL ...") existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows]) update_rows = [row for row in base_rows if str(row["element_id"]) in existing_ids] print(f"[update] updating stable_id for {len(update_rows)} rows ...") updated_count = _update_base_stable_id(update_rows) return { "partition_dt": bizdate, "base_fetched": len(base_rows), "base_matched": len(update_rows), "base_updated": updated_count, } def _ensure_tables() -> None: base_table = _safe_identifier(settings.substance_element_base_table) effect_table = _safe_identifier(settings.substance_element_effect_table) create_base_sql = f""" CREATE TABLE IF NOT EXISTS {base_table} ( id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY, element_id VARCHAR(64) NOT NULL COMMENT '元素id', stable_id VARCHAR(64) NULL COMMENT '分类stable_id', element_name VARCHAR(256) NULL COMMENT '元素名称', dimension VARCHAR(32) NULL COMMENT '元素维度', classified_as VARCHAR(64) NULL COMMENT '归类', create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uniq_element_id (element_id) ) """ create_effect_sql = f""" CREATE TABLE IF NOT EXISTS {effect_table} ( id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY, element_id VARCHAR(64) NOT NULL COMMENT '元素id', vid_count BIGINT NULL COMMENT '视频数量', vid_list LONGTEXT NULL COMMENT '视频列表', rov_score DOUBLE NULL COMMENT 'ROV得分', str_score DOUBLE NULL COMMENT 'STR得分', ros_score DOUBLE NULL COMMENT 'ROS得分', dt VARCHAR(32) NOT NULL COMMENT '分区日期', create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uniq_element_dt (element_id, dt) ) """ alter_base_sql = f""" ALTER TABLE {base_table} ADD COLUMN stable_id VARCHAR(64) NULL COMMENT '分类stable_id' AFTER element_id """ alter_effect_sql = f""" ALTER TABLE {effect_table} MODIFY COLUMN vid_list LONGTEXT NULL COMMENT '视频列表' """ check_column_sql = text( """ SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = :table_name AND COLUMN_NAME = :column_name """ ) with SessionLocal() as session: session.execute(text(create_base_sql)) session.execute(text(create_effect_sql)) if session.execute( check_column_sql, {"table_name": settings.substance_element_base_table, "column_name": "stable_id"}, ).scalar() == 0: session.execute(text(alter_base_sql)) session.execute(text(alter_effect_sql)) session.commit() def _load_existing_element_ids(element_ids: list[str]) -> set[str]: if not element_ids: return set() base_table = _safe_identifier(settings.substance_element_base_table) existing: set[str] = set() query_sql = ( text( f""" SELECT element_id FROM {base_table} WHERE element_id IN :element_ids """ ) .bindparams(bindparam("element_ids", expanding=True)) ) with SessionLocal() as session: for start in range(0, len(element_ids), BATCH_SIZE): batch = element_ids[start : start + BATCH_SIZE] rows = session.execute(query_sql, {"element_ids": tuple(batch)}).all() existing.update(str(row[0]) for row in rows) return existing def _insert_base_elements(rows: list[dict[str, object]]) -> int: if not rows: return 0 base_table = _safe_identifier(settings.substance_element_base_table) insert_sql = text( f""" INSERT INTO {base_table} ( element_id, stable_id, element_name, dimension, classified_as ) VALUES ( :element_id, :stable_id, :element_name, :dimension, :classified_as ) """ ) with SessionLocal() as session: for start in range(0, len(rows), BATCH_SIZE): session.execute(insert_sql, rows[start : start + BATCH_SIZE]) session.commit() return len(rows) def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int: effect_table = _safe_identifier(settings.substance_element_effect_table) delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt") insert_sql = text( f""" INSERT INTO {effect_table} ( element_id, vid_count, vid_list, rov_score, str_score, ros_score, dt ) VALUES ( :element_id, :vid_count, :vid_list, :rov_score, :str_score, :ros_score, :dt ) """ ) with SessionLocal() as session: session.execute(delete_sql, {"dt": partition_dt}) for start in range(0, len(rows), BATCH_SIZE): session.execute(insert_sql, rows[start : start + BATCH_SIZE]) session.commit() return len(rows) def sync_substance_elements(partition_dt: str | None = None) -> dict[str, int | str]: """同步指定分区(默认上海时区昨天)的基本元素增量与效果全量。""" _ensure_tables() bizdate = partition_dt or _yesterday_partition_dt() base_rows, effect_rows = _fetch_from_odps(bizdate) existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows]) incremental_rows = [row for row in base_rows if str(row["element_id"]) not in existing_ids] inserted_base_count = _insert_base_elements(incremental_rows) inserted_effect_count = _replace_effect_partition(bizdate, effect_rows) return { "partition_dt": bizdate, "base_fetched": len(base_rows), "base_inserted": inserted_base_count, "effect_inserted": inserted_effect_count, }