| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """垂直领域分类:从 ODPS 同步基本分类(增量)与效果数据(按日全量)。"""
- import re
- from datetime import datetime, timedelta
- from decimal import Decimal
- from zoneinfo import ZoneInfo
- from sqlalchemy import bindparam, text
- from app.core.config import settings
- from app.db.mysql import SessionLocal
- from app.odps.client import get_odps_client
- IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
- BATCH_SIZE = 500
- SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
- _T_ELEMENT_ALL_CATEGORY_CTE = """
- WITH t_element AS
- (
- SELECT t1.post_id AS vid
- ,t1.global_element_id AS element_id
- ,t1.element_name AS 元素名称
- ,t4.stable_id AS stable_id
- ,t4.parent_stable_id AS parent_stable_id
- ,t4.name AS 分类名称
- ,t4.classified_as AS 归类
- ,t4.source_type AS 维度
- ,t4.description AS 分类说明
- ,t4.level AS 分类层级
- ,t4.path AS 分类路径
- ,t1.element_type AS 元素维度
- ,t6.topic_point_type AS 点类型
- ,t2.contribution AS 贡献度
- ,t2.share_will AS 分享意愿度
- ,t2.consume_will AS 消费意愿度
- ,t2.click_will AS 点击意愿度
- FROM loghubods.element_classification_mapping t1
- LEFT JOIN loghubods.dwd_post_word_contribution_di t2
- ON t1.post_id = t2.post_id
- AND t1.element_name = t2.word
- AND t2.dt = '{bizdate}'
- LEFT JOIN loghubods.global_element t3
- ON t1.global_element_id = t3.id
- AND t3.dt = '{bizdate}'
- LEFT JOIN loghubods.global_category t4
- ON t1.global_category_stable_id = t4.stable_id
- AND t4.dt = '{bizdate}'
- LEFT JOIN loghubods.post_decode_topic_point_element t5
- ON t1.source_element_id = t5.id
- AND t1.post_id = t5.post_id
- AND t5.dt = '{bizdate}'
- LEFT JOIN loghubods.post_decode_topic_point t6
- ON t5.topic_point_id = t6.id
- AND t6.dt = '{bizdate}'
- WHERE t1.dt = '{bizdate}'
- AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
- AND t3.retired_at_execution_id IS NULL
- AND t4.retired_at_execution_id IS NULL
- AND t5.id IS NOT NULL
- AND t1.element_type = '实质'
- AND t4.classified_as = '垂直领域细分'
- )
- ,t_element_all_category AS
- (
- SELECT e.vid
- ,e.贡献度
- ,e.分享意愿度
- ,e.消费意愿度
- ,e.点击意愿度
- ,gc_ancestor.stable_id
- ,gc_ancestor.parent_stable_id
- ,gc_ancestor.level
- ,gc_ancestor.name
- ,gc_ancestor.classified_as
- ,gc_ancestor.source_type
- ,gc_ancestor.description
- FROM t_element e
- JOIN loghubods.global_category gc_self
- ON e.stable_id = gc_self.stable_id
- AND gc_self.dt = '{bizdate}'
- AND gc_self.retired_at_execution_id IS NULL
- JOIN loghubods.global_category gc_ancestor
- ON gc_ancestor.dt = '{bizdate}'
- AND gc_ancestor.retired_at_execution_id IS NULL
- AND gc_ancestor.dt = gc_self.dt
- WHERE INSTR(gc_self.path, CONCAT(gc_ancestor.path, '/')) = 1
- UNION ALL
- SELECT vid
- ,贡献度
- ,分享意愿度
- ,消费意愿度
- ,点击意愿度
- ,stable_id
- ,parent_stable_id
- ,分类层级 AS level
- ,分类名称 AS name
- ,归类 AS classified_as
- ,维度 AS source_type
- ,分类说明 AS description
- FROM t_element
- WHERE stable_id IS NOT NULL
- )
- """
- def _safe_identifier(name: str) -> str:
- if not IDENTIFIER_RE.match(name):
- raise ValueError(f"invalid sql identifier: {name}")
- return name
- def _normalize_scalar(value: object) -> object:
- if isinstance(value, Decimal):
- return float(value)
- return value
- def _normalize_category_id(value: object) -> str:
- text_value = str(value or "").strip()
- if not text_value:
- raise ValueError("category_id 不能为空")
- return text_value
- def _yesterday_partition_dt(reference: datetime | None = None) -> str:
- now = reference or datetime.now(SHANGHAI_TZ)
- return (now.date() - timedelta(days=1)).strftime("%Y%m%d")
- def _build_sync_sql(bizdate: str) -> str:
- return (
- _T_ELEMENT_ALL_CATEGORY_CTE.format(bizdate=bizdate)
- + """
- SELECT CAST(t1.stable_id AS STRING) AS category_id
- ,CAST(t1.parent_stable_id AS STRING) AS parent_stable_id
- ,t1.name AS category_name
- ,t1.level AS category_level
- ,t1.source_type AS dimension
- ,t1.classified_as AS classified_as
- ,COUNT(DISTINCT t1.vid) AS vid_count
- ,COALESCE(ROUND(
- SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END)
- / NULLIF(SUM(t2.分发曝光pv), 0)
- , 4), 0) AS rov_score
- ,COALESCE(ROUND(
- SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END)
- / NULLIF(SUM(t2.分发曝光pv), 0)
- , 4), 0) AS str_score
- ,COALESCE(ROUND(
- SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END)
- / NULLIF(SUM(t2.分发分享pv), 0)
- , 4), 0) AS ros_score
- FROM t_element_all_category t1
- LEFT JOIN loghubods.video_multi_strategy_effect t2
- ON t1.vid = t2.vid
- AND t2.dt = '{bizdate}'
- AND t2.strategy = '当天'
- LEFT JOIN loghubods.global_category t3
- ON t1.stable_id = t3.stable_id
- AND t3.dt = '{bizdate}'
- AND t3.retired_at_execution_id IS NULL
- GROUP BY t1.stable_id
- ,t1.parent_stable_id
- ,t1.name
- ,t1.level
- ,t1.source_type
- ,t1.classified_as
- """
- ).format(bizdate=bizdate)
- def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]:
- sql = _build_sync_sql(bizdate)
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- base_rows: dict[str, dict[str, object]] = {}
- effect_rows: list[dict[str, object]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- category_id = _normalize_category_id(record["category_id"])
- base_rows[category_id] = {
- "category_id": category_id,
- "parent_stable_id": str(record["parent_stable_id"] or "").strip() or None,
- "category_name": record["category_name"],
- "category_level": record["category_level"],
- "dimension": record["dimension"],
- "classified_as": record["classified_as"],
- }
- effect_rows.append(
- {
- "category_id": category_id,
- "vid_count": record["vid_count"],
- "rov_score": _normalize_scalar(record["rov_score"]),
- "str_score": _normalize_scalar(record["str_score"]),
- "ros_score": _normalize_scalar(record["ros_score"]),
- "dt": bizdate,
- }
- )
- return list(base_rows.values()), effect_rows
- def _ensure_tables() -> None:
- base_table = _safe_identifier(settings.vertical_category_base_table)
- effect_table = _safe_identifier(settings.vertical_category_effect_table)
- create_base_sql = f"""
- CREATE TABLE IF NOT EXISTS {base_table}
- (
- id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
- category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id',
- parent_stable_id VARCHAR(64) NULL COMMENT '父分类stable_id',
- category_name VARCHAR(256) NULL COMMENT '分类名称',
- category_level INT NULL COMMENT '分类层级',
- dimension VARCHAR(32) NULL COMMENT '维度',
- classified_as VARCHAR(64) NULL COMMENT '归类',
- create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- UNIQUE KEY uniq_category_id (category_id)
- )
- """
- create_effect_sql = f"""
- CREATE TABLE IF NOT EXISTS {effect_table}
- (
- id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
- category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id',
- vid_count BIGINT NULL COMMENT '视频数量',
- rov_score DOUBLE NULL COMMENT 'ROV得分',
- str_score DOUBLE NULL COMMENT 'STR得分',
- ros_score DOUBLE NULL COMMENT 'ROS得分',
- dt VARCHAR(32) NOT NULL COMMENT '分区日期',
- create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- UNIQUE KEY uniq_category_dt (category_id, dt)
- )
- """
- with SessionLocal() as session:
- session.execute(text(create_base_sql))
- session.execute(text(create_effect_sql))
- session.commit()
- def _load_existing_category_ids(category_ids: list[str]) -> set[str]:
- if not category_ids:
- return set()
- base_table = _safe_identifier(settings.vertical_category_base_table)
- existing: set[str] = set()
- query_sql = (
- text(
- f"""
- SELECT category_id
- FROM {base_table}
- WHERE category_id IN :category_ids
- """
- )
- .bindparams(bindparam("category_ids", expanding=True))
- )
- with SessionLocal() as session:
- for start in range(0, len(category_ids), BATCH_SIZE):
- batch = category_ids[start : start + BATCH_SIZE]
- rows = session.execute(query_sql, {"category_ids": tuple(batch)}).all()
- existing.update(str(row[0]) for row in rows)
- return existing
- def _insert_base_categories(rows: list[dict[str, object]]) -> int:
- if not rows:
- return 0
- base_table = _safe_identifier(settings.vertical_category_base_table)
- insert_sql = text(
- f"""
- INSERT INTO {base_table}
- (
- category_id,
- parent_stable_id,
- category_name,
- category_level,
- dimension,
- classified_as
- )
- VALUES
- (
- :category_id,
- :parent_stable_id,
- :category_name,
- :category_level,
- :dimension,
- :classified_as
- )
- """
- )
- with SessionLocal() as session:
- for start in range(0, len(rows), BATCH_SIZE):
- session.execute(insert_sql, rows[start : start + BATCH_SIZE])
- session.commit()
- return len(rows)
- def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int:
- effect_table = _safe_identifier(settings.vertical_category_effect_table)
- delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt")
- insert_sql = text(
- f"""
- INSERT INTO {effect_table}
- (
- category_id,
- vid_count,
- rov_score,
- str_score,
- ros_score,
- dt
- )
- VALUES
- (
- :category_id,
- :vid_count,
- :rov_score,
- :str_score,
- :ros_score,
- :dt
- )
- """
- )
- with SessionLocal() as session:
- session.execute(delete_sql, {"dt": partition_dt})
- for start in range(0, len(rows), BATCH_SIZE):
- session.execute(insert_sql, rows[start : start + BATCH_SIZE])
- session.commit()
- return len(rows)
- def sync_vertical_categories(partition_dt: str | None = None) -> dict[str, int | str]:
- """同步指定分区(默认上海时区昨天)的基本分类增量与效果全量。"""
- _ensure_tables()
- bizdate = partition_dt or _yesterday_partition_dt()
- base_rows, effect_rows = _fetch_from_odps(bizdate)
- existing_ids = _load_existing_category_ids([str(row["category_id"]) for row in base_rows])
- incremental_rows = [row for row in base_rows if str(row["category_id"]) not in existing_ids]
- inserted_base_count = _insert_base_categories(incremental_rows)
- inserted_effect_count = _replace_effect_partition(bizdate, effect_rows)
- return {
- "partition_dt": bizdate,
- "base_fetched": len(base_rows),
- "base_inserted": inserted_base_count,
- "effect_inserted": inserted_effect_count,
- }
|