| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- """垂直领域实质元素:从 ODPS 同步基本元素(增量)与效果数据(按日全量)。"""
- import json
- import re
- from datetime import datetime, timedelta
- from decimal import Decimal
- from zoneinfo import ZoneInfo
- from sqlalchemy import bindparam, text
- from app.core.config import settings
- from app.db.mysql import SessionLocal
- from app.odps.client import get_odps_client
- IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
- BATCH_SIZE = 500
- SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
- _T_ELEMENT_CTE = """
- WITH t_element AS
- (
- SELECT t1.post_id AS vid
- ,t1.global_element_id AS element_id
- ,t1.element_name AS 元素名称
- ,t1.element_type AS 维度
- ,t4.classified_as AS 归类
- ,t4.name AS 分类名称
- ,t4.stable_id AS stable_id
- ,t4.description AS 分类说明
- ,t4.level AS 分类层级
- ,t4.path AS 分类路径
- ,t1.element_type AS 元素维度
- ,t6.topic_point_type AS 点类型
- ,t2.contribution AS 贡献度
- ,t2.share_will AS 分享意愿度
- ,t2.consume_will AS 消费意愿度
- ,t2.click_will AS 点击意愿度
- FROM loghubods.element_classification_mapping t1
- LEFT JOIN loghubods.dwd_post_word_contribution_di t2
- ON t1.post_id = t2.post_id
- AND t1.element_name = t2.word
- AND t2.dt = '{bizdate}'
- LEFT JOIN loghubods.global_element t3
- ON t1.global_element_id = t3.id
- AND t3.dt = '{bizdate}'
- LEFT JOIN loghubods.global_category t4
- ON t1.global_category_stable_id = t4.stable_id
- AND t4.dt = '{bizdate}'
- LEFT JOIN loghubods.post_decode_topic_point_element t5
- ON t1.source_element_id = t5.id
- AND t1.post_id = t5.post_id
- AND t5.dt = '{bizdate}'
- LEFT JOIN loghubods.post_decode_topic_point t6
- ON t5.topic_point_id = t6.id
- AND t6.dt = '{bizdate}'
- WHERE t1.dt = '{bizdate}'
- AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
- AND t3.retired_at_execution_id IS NULL
- AND t4.retired_at_execution_id IS NULL
- AND t5.id IS NOT NULL
- AND t1.element_type = '实质'
- AND t4.classified_as = '垂直领域细分'
- )
- """
- _T_ELEMENT_BASE_CTE = """
- WITH t_element AS
- (
- SELECT t1.global_element_id AS element_id
- ,t1.element_name AS 元素名称
- ,t1.element_type AS 维度
- ,t4.classified_as AS 归类
- ,t4.stable_id AS stable_id
- FROM loghubods.element_classification_mapping t1
- LEFT JOIN loghubods.global_element t3
- ON t1.global_element_id = t3.id
- AND t3.dt = '{bizdate}'
- LEFT JOIN loghubods.global_category t4
- ON t1.global_category_stable_id = t4.stable_id
- AND t4.dt = '{bizdate}'
- LEFT JOIN loghubods.post_decode_topic_point_element t5
- ON t1.source_element_id = t5.id
- AND t1.post_id = t5.post_id
- AND t5.dt = '{bizdate}'
- WHERE t1.dt = '{bizdate}'
- AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
- AND t3.retired_at_execution_id IS NULL
- AND t4.retired_at_execution_id IS NULL
- AND t5.id IS NOT NULL
- AND t1.element_type = '实质'
- AND t4.classified_as = '垂直领域细分'
- )
- """
- def _safe_identifier(name: str) -> str:
- if not IDENTIFIER_RE.match(name):
- raise ValueError(f"invalid sql identifier: {name}")
- return name
- def _serialize_vid_list(value: object) -> str | None:
- if value is None:
- return None
- if isinstance(value, list):
- return json.dumps(value, ensure_ascii=False)
- return str(value)
- def _normalize_scalar(value: object) -> object:
- if isinstance(value, Decimal):
- return float(value)
- return value
- def _normalize_element_id(value: object) -> str:
- text_value = str(value or "").strip()
- if not text_value:
- raise ValueError("element_id 不能为空")
- return text_value
- def _yesterday_partition_dt(reference: datetime | None = None) -> str:
- now = reference or datetime.now(SHANGHAI_TZ)
- return (now.date() - timedelta(days=1)).strftime("%Y%m%d")
- def _build_sync_sql(bizdate: str) -> str:
- return (
- _T_ELEMENT_CTE.format(bizdate=bizdate)
- + """
- SELECT CAST(t1.element_id AS STRING) AS element_id
- ,t1.元素名称 AS element_name
- ,t1.维度 AS dimension
- ,t1.归类 AS classified_as
- ,CAST(t1.stable_id AS STRING) AS stable_id
- ,COUNT(DISTINCT t1.vid) AS vid_count
- ,COLLECT_SET(t1.vid) AS vid_list
- ,COALESCE(ROUND(
- SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END)
- / NULLIF(SUM(t2.分发曝光pv), 0)
- , 4), 0) AS rov_score
- ,COALESCE(ROUND(
- SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END)
- / NULLIF(SUM(t2.分发曝光pv), 0)
- , 4), 0) AS str_score
- ,COALESCE(ROUND(
- SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END)
- / NULLIF(SUM(t2.分发分享pv), 0)
- , 4), 0) AS ros_score
- FROM t_element t1
- LEFT JOIN loghubods.video_multi_strategy_effect t2
- ON t1.vid = t2.vid
- AND t2.dt = '{bizdate}'
- AND t2.strategy = '当天'
- GROUP BY t1.element_id
- ,t1.元素名称
- ,t1.维度
- ,t1.归类
- ,t1.stable_id
- """
- ).format(bizdate=bizdate)
- def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]:
- sql = _build_sync_sql(bizdate)
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- base_rows: dict[str, dict[str, object]] = {}
- effect_rows: list[dict[str, object]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- element_id = _normalize_element_id(record["element_id"])
- base_rows[element_id] = {
- "element_id": element_id,
- "stable_id": str(record["stable_id"] or "").strip() or None,
- "element_name": record["element_name"],
- "dimension": record["dimension"],
- "classified_as": record["classified_as"],
- }
- effect_rows.append(
- {
- "element_id": element_id,
- "vid_count": record["vid_count"],
- "vid_list": _serialize_vid_list(record["vid_list"]),
- "rov_score": _normalize_scalar(record["rov_score"]),
- "str_score": _normalize_scalar(record["str_score"]),
- "ros_score": _normalize_scalar(record["ros_score"]),
- "dt": bizdate,
- }
- )
- return list(base_rows.values()), effect_rows
- def _build_base_fields_sql(bizdate: str) -> str:
- return (
- _T_ELEMENT_BASE_CTE.format(bizdate=bizdate)
- + """
- SELECT DISTINCT CAST(t1.element_id AS STRING) AS element_id
- ,CAST(t1.stable_id AS STRING) AS stable_id
- ,t1.元素名称 AS element_name
- ,t1.维度 AS dimension
- ,t1.归类 AS classified_as
- FROM t_element t1
- """
- )
- def _fetch_base_fields_from_odps(bizdate: str) -> list[dict[str, object]]:
- print(f"[update] querying ODPS base fields, dt={bizdate} ...")
- sql = _build_base_fields_sql(bizdate)
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- rows: dict[str, dict[str, object]] = {}
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- element_id = _normalize_element_id(record["element_id"])
- rows[element_id] = {
- "element_id": element_id,
- "stable_id": str(record["stable_id"] or "").strip() or None,
- "element_name": record["element_name"],
- "dimension": record["dimension"],
- "classified_as": record["classified_as"],
- }
- print(f"[update] ODPS done, fetched {len(rows)} rows")
- return list(rows.values())
- def _update_base_stable_id(rows: list[dict[str, object]]) -> int:
- if not rows:
- return 0
- base_table = _safe_identifier(settings.substance_element_base_table)
- update_sql = text(
- f"""
- UPDATE {base_table}
- SET stable_id = :stable_id
- WHERE element_id = :element_id
- AND NOT (stable_id <=> :stable_id)
- """
- )
- updated = 0
- with SessionLocal() as session:
- for start in range(0, len(rows), BATCH_SIZE):
- batch = rows[start : start + BATCH_SIZE]
- result = session.execute(update_sql, batch)
- updated += int(result.rowcount or 0)
- session.commit()
- return updated
- def truncate_substance_element_base() -> None:
- base_table = _safe_identifier(settings.substance_element_base_table)
- with SessionLocal() as session:
- session.execute(text(f"TRUNCATE TABLE {base_table}"))
- session.commit()
- print(f"[sync] truncated {base_table}")
- def sync_substance_element_base_only(partition_dt: str | None = None) -> dict[str, int | str]:
- """仅从 ODPS 全量写入 substance_element_base,不重写 effect_di 表。"""
- _ensure_tables()
- bizdate = partition_dt or _yesterday_partition_dt()
- base_rows = _fetch_base_fields_from_odps(bizdate)
- print(f"[sync] inserting {len(base_rows)} rows into base table ...")
- inserted_count = _insert_base_elements(base_rows)
- return {
- "partition_dt": bizdate,
- "base_fetched": len(base_rows),
- "base_inserted": inserted_count,
- }
- def update_substance_element_base_fields(partition_dt: str | None = None) -> dict[str, int | str]:
- """仅回填 substance_element_base.stable_id,不重写 effect_di 表。"""
- bizdate = partition_dt or _yesterday_partition_dt()
- base_rows = _fetch_base_fields_from_odps(bizdate)
- print(f"[update] matching existing rows in MySQL ...")
- existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows])
- update_rows = [row for row in base_rows if str(row["element_id"]) in existing_ids]
- print(f"[update] updating stable_id for {len(update_rows)} rows ...")
- updated_count = _update_base_stable_id(update_rows)
- return {
- "partition_dt": bizdate,
- "base_fetched": len(base_rows),
- "base_matched": len(update_rows),
- "base_updated": updated_count,
- }
- def _ensure_tables() -> None:
- base_table = _safe_identifier(settings.substance_element_base_table)
- effect_table = _safe_identifier(settings.substance_element_effect_table)
- create_base_sql = f"""
- CREATE TABLE IF NOT EXISTS {base_table}
- (
- id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
- element_id VARCHAR(64) NOT NULL COMMENT '元素id',
- stable_id VARCHAR(64) NULL COMMENT '分类stable_id',
- element_name VARCHAR(256) NULL COMMENT '元素名称',
- dimension VARCHAR(32) NULL COMMENT '元素维度',
- classified_as VARCHAR(64) NULL COMMENT '归类',
- create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- UNIQUE KEY uniq_element_id (element_id)
- )
- """
- create_effect_sql = f"""
- CREATE TABLE IF NOT EXISTS {effect_table}
- (
- id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
- element_id VARCHAR(64) NOT NULL COMMENT '元素id',
- vid_count BIGINT NULL COMMENT '视频数量',
- vid_list LONGTEXT NULL COMMENT '视频列表',
- rov_score DOUBLE NULL COMMENT 'ROV得分',
- str_score DOUBLE NULL COMMENT 'STR得分',
- ros_score DOUBLE NULL COMMENT 'ROS得分',
- dt VARCHAR(32) NOT NULL COMMENT '分区日期',
- create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- UNIQUE KEY uniq_element_dt (element_id, dt)
- )
- """
- alter_base_sql = f"""
- ALTER TABLE {base_table}
- ADD COLUMN stable_id VARCHAR(64) NULL COMMENT '分类stable_id' AFTER element_id
- """
- alter_effect_sql = f"""
- ALTER TABLE {effect_table}
- MODIFY COLUMN vid_list LONGTEXT NULL COMMENT '视频列表'
- """
- check_column_sql = text(
- """
- SELECT COUNT(*)
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = :table_name
- AND COLUMN_NAME = :column_name
- """
- )
- with SessionLocal() as session:
- session.execute(text(create_base_sql))
- session.execute(text(create_effect_sql))
- if session.execute(
- check_column_sql,
- {"table_name": settings.substance_element_base_table, "column_name": "stable_id"},
- ).scalar() == 0:
- session.execute(text(alter_base_sql))
- session.execute(text(alter_effect_sql))
- session.commit()
- def _load_existing_element_ids(element_ids: list[str]) -> set[str]:
- if not element_ids:
- return set()
- base_table = _safe_identifier(settings.substance_element_base_table)
- existing: set[str] = set()
- query_sql = (
- text(
- f"""
- SELECT element_id
- FROM {base_table}
- WHERE element_id IN :element_ids
- """
- )
- .bindparams(bindparam("element_ids", expanding=True))
- )
- with SessionLocal() as session:
- for start in range(0, len(element_ids), BATCH_SIZE):
- batch = element_ids[start : start + BATCH_SIZE]
- rows = session.execute(query_sql, {"element_ids": tuple(batch)}).all()
- existing.update(str(row[0]) for row in rows)
- return existing
- def _insert_base_elements(rows: list[dict[str, object]]) -> int:
- if not rows:
- return 0
- base_table = _safe_identifier(settings.substance_element_base_table)
- insert_sql = text(
- f"""
- INSERT INTO {base_table}
- (
- element_id,
- stable_id,
- element_name,
- dimension,
- classified_as
- )
- VALUES
- (
- :element_id,
- :stable_id,
- :element_name,
- :dimension,
- :classified_as
- )
- """
- )
- with SessionLocal() as session:
- for start in range(0, len(rows), BATCH_SIZE):
- session.execute(insert_sql, rows[start : start + BATCH_SIZE])
- session.commit()
- return len(rows)
- def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int:
- effect_table = _safe_identifier(settings.substance_element_effect_table)
- delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt")
- insert_sql = text(
- f"""
- INSERT INTO {effect_table}
- (
- element_id,
- vid_count,
- vid_list,
- rov_score,
- str_score,
- ros_score,
- dt
- )
- VALUES
- (
- :element_id,
- :vid_count,
- :vid_list,
- :rov_score,
- :str_score,
- :ros_score,
- :dt
- )
- """
- )
- with SessionLocal() as session:
- session.execute(delete_sql, {"dt": partition_dt})
- for start in range(0, len(rows), BATCH_SIZE):
- session.execute(insert_sql, rows[start : start + BATCH_SIZE])
- session.commit()
- return len(rows)
- def sync_substance_elements(partition_dt: str | None = None) -> dict[str, int | str]:
- """同步指定分区(默认上海时区昨天)的基本元素增量与效果全量。"""
- _ensure_tables()
- bizdate = partition_dt or _yesterday_partition_dt()
- base_rows, effect_rows = _fetch_from_odps(bizdate)
- existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows])
- incremental_rows = [row for row in base_rows if str(row["element_id"]) not in existing_ids]
- inserted_base_count = _insert_base_elements(incremental_rows)
- inserted_effect_count = _replace_effect_partition(bizdate, effect_rows)
- return {
- "partition_dt": bizdate,
- "base_fetched": len(base_rows),
- "base_inserted": inserted_base_count,
- "effect_inserted": inserted_effect_count,
- }
|