"""垂直领域分类:从 ODPS 同步基本分类(增量)与效果数据(按日全量)。""" import re from datetime import datetime, timedelta from decimal import Decimal from zoneinfo import ZoneInfo from sqlalchemy import bindparam, text from app.core.config import settings from app.db.mysql import SessionLocal from app.odps.client import get_odps_client IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") BATCH_SIZE = 500 SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") _T_ELEMENT_ALL_CATEGORY_CTE = """ WITH t_element AS ( SELECT t1.post_id AS vid ,t1.global_element_id AS element_id ,t1.element_name AS 元素名称 ,t4.stable_id AS stable_id ,t4.parent_stable_id AS parent_stable_id ,t4.name AS 分类名称 ,t4.classified_as AS 归类 ,t4.source_type AS 维度 ,t4.description AS 分类说明 ,t4.level AS 分类层级 ,t4.path AS 分类路径 ,t1.element_type AS 元素维度 ,t6.topic_point_type AS 点类型 ,t2.contribution AS 贡献度 ,t2.share_will AS 分享意愿度 ,t2.consume_will AS 消费意愿度 ,t2.click_will AS 点击意愿度 FROM loghubods.element_classification_mapping t1 LEFT JOIN loghubods.dwd_post_word_contribution_di t2 ON t1.post_id = t2.post_id AND t1.element_name = t2.word AND t2.dt = '{bizdate}' LEFT JOIN loghubods.global_element t3 ON t1.global_element_id = t3.id AND t3.dt = '{bizdate}' LEFT JOIN loghubods.global_category t4 ON t1.global_category_stable_id = t4.stable_id AND t4.dt = '{bizdate}' LEFT JOIN loghubods.post_decode_topic_point_element t5 ON t1.source_element_id = t5.id AND t1.post_id = t5.post_id AND t5.dt = '{bizdate}' LEFT JOIN loghubods.post_decode_topic_point t6 ON t5.topic_point_id = t6.id AND t6.dt = '{bizdate}' WHERE t1.dt = '{bizdate}' AND LENGTH(CAST(t1.post_id AS STRING)) <= 10 AND t3.retired_at_execution_id IS NULL AND t4.retired_at_execution_id IS NULL AND t5.id IS NOT NULL AND t1.element_type = '实质' AND t4.classified_as = '垂直领域细分' ) ,t_element_all_category AS ( SELECT e.vid ,e.贡献度 ,e.分享意愿度 ,e.消费意愿度 ,e.点击意愿度 ,gc_ancestor.stable_id ,gc_ancestor.parent_stable_id ,gc_ancestor.level ,gc_ancestor.name ,gc_ancestor.classified_as ,gc_ancestor.source_type ,gc_ancestor.description FROM t_element e JOIN loghubods.global_category gc_self ON e.stable_id = gc_self.stable_id AND gc_self.dt = '{bizdate}' AND gc_self.retired_at_execution_id IS NULL JOIN loghubods.global_category gc_ancestor ON gc_ancestor.dt = '{bizdate}' AND gc_ancestor.retired_at_execution_id IS NULL AND gc_ancestor.dt = gc_self.dt WHERE INSTR(gc_self.path, CONCAT(gc_ancestor.path, '/')) = 1 UNION ALL SELECT vid ,贡献度 ,分享意愿度 ,消费意愿度 ,点击意愿度 ,stable_id ,parent_stable_id ,分类层级 AS level ,分类名称 AS name ,归类 AS classified_as ,维度 AS source_type ,分类说明 AS description FROM t_element WHERE stable_id IS NOT NULL ) """ def _safe_identifier(name: str) -> str: if not IDENTIFIER_RE.match(name): raise ValueError(f"invalid sql identifier: {name}") return name def _normalize_scalar(value: object) -> object: if isinstance(value, Decimal): return float(value) return value def _normalize_category_id(value: object) -> str: text_value = str(value or "").strip() if not text_value: raise ValueError("category_id 不能为空") return text_value def _yesterday_partition_dt(reference: datetime | None = None) -> str: now = reference or datetime.now(SHANGHAI_TZ) return (now.date() - timedelta(days=1)).strftime("%Y%m%d") def _build_sync_sql(bizdate: str) -> str: return ( _T_ELEMENT_ALL_CATEGORY_CTE.format(bizdate=bizdate) + """ SELECT CAST(t1.stable_id AS STRING) AS category_id ,CAST(t1.parent_stable_id AS STRING) AS parent_stable_id ,t1.name AS category_name ,t1.level AS category_level ,t1.source_type AS dimension ,t1.classified_as AS classified_as ,COUNT(DISTINCT t1.vid) AS vid_count ,COALESCE(ROUND( SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END) / NULLIF(SUM(t2.分发曝光pv), 0) , 4), 0) AS rov_score ,COALESCE(ROUND( SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END) / NULLIF(SUM(t2.分发曝光pv), 0) , 4), 0) AS str_score ,COALESCE(ROUND( SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END) / NULLIF(SUM(t2.分发分享pv), 0) , 4), 0) AS ros_score FROM t_element_all_category t1 LEFT JOIN loghubods.video_multi_strategy_effect t2 ON t1.vid = t2.vid AND t2.dt = '{bizdate}' AND t2.strategy = '当天' LEFT JOIN loghubods.global_category t3 ON t1.stable_id = t3.stable_id AND t3.dt = '{bizdate}' AND t3.retired_at_execution_id IS NULL GROUP BY t1.stable_id ,t1.parent_stable_id ,t1.name ,t1.level ,t1.source_type ,t1.classified_as """ ).format(bizdate=bizdate) def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]: sql = _build_sync_sql(bizdate) odps_client = get_odps_client() instance = odps_client.execute_sql(sql) base_rows: dict[str, dict[str, object]] = {} effect_rows: list[dict[str, object]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: category_id = _normalize_category_id(record["category_id"]) base_rows[category_id] = { "category_id": category_id, "parent_stable_id": str(record["parent_stable_id"] or "").strip() or None, "category_name": record["category_name"], "category_level": record["category_level"], "dimension": record["dimension"], "classified_as": record["classified_as"], } effect_rows.append( { "category_id": category_id, "vid_count": record["vid_count"], "rov_score": _normalize_scalar(record["rov_score"]), "str_score": _normalize_scalar(record["str_score"]), "ros_score": _normalize_scalar(record["ros_score"]), "dt": bizdate, } ) return list(base_rows.values()), effect_rows def _ensure_tables() -> None: base_table = _safe_identifier(settings.vertical_category_base_table) effect_table = _safe_identifier(settings.vertical_category_effect_table) create_base_sql = f""" CREATE TABLE IF NOT EXISTS {base_table} ( id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY, category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id', parent_stable_id VARCHAR(64) NULL COMMENT '父分类stable_id', category_name VARCHAR(256) NULL COMMENT '分类名称', category_level INT NULL COMMENT '分类层级', dimension VARCHAR(32) NULL COMMENT '维度', classified_as VARCHAR(64) NULL COMMENT '归类', create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uniq_category_id (category_id) ) """ create_effect_sql = f""" CREATE TABLE IF NOT EXISTS {effect_table} ( id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY, category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id', vid_count BIGINT NULL COMMENT '视频数量', rov_score DOUBLE NULL COMMENT 'ROV得分', str_score DOUBLE NULL COMMENT 'STR得分', ros_score DOUBLE NULL COMMENT 'ROS得分', dt VARCHAR(32) NOT NULL COMMENT '分区日期', create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uniq_category_dt (category_id, dt) ) """ with SessionLocal() as session: session.execute(text(create_base_sql)) session.execute(text(create_effect_sql)) session.commit() def _load_existing_category_ids(category_ids: list[str]) -> set[str]: if not category_ids: return set() base_table = _safe_identifier(settings.vertical_category_base_table) existing: set[str] = set() query_sql = ( text( f""" SELECT category_id FROM {base_table} WHERE category_id IN :category_ids """ ) .bindparams(bindparam("category_ids", expanding=True)) ) with SessionLocal() as session: for start in range(0, len(category_ids), BATCH_SIZE): batch = category_ids[start : start + BATCH_SIZE] rows = session.execute(query_sql, {"category_ids": tuple(batch)}).all() existing.update(str(row[0]) for row in rows) return existing def _insert_base_categories(rows: list[dict[str, object]]) -> int: if not rows: return 0 base_table = _safe_identifier(settings.vertical_category_base_table) insert_sql = text( f""" INSERT INTO {base_table} ( category_id, parent_stable_id, category_name, category_level, dimension, classified_as ) VALUES ( :category_id, :parent_stable_id, :category_name, :category_level, :dimension, :classified_as ) """ ) with SessionLocal() as session: for start in range(0, len(rows), BATCH_SIZE): session.execute(insert_sql, rows[start : start + BATCH_SIZE]) session.commit() return len(rows) def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int: effect_table = _safe_identifier(settings.vertical_category_effect_table) delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt") insert_sql = text( f""" INSERT INTO {effect_table} ( category_id, vid_count, rov_score, str_score, ros_score, dt ) VALUES ( :category_id, :vid_count, :rov_score, :str_score, :ros_score, :dt ) """ ) with SessionLocal() as session: session.execute(delete_sql, {"dt": partition_dt}) for start in range(0, len(rows), BATCH_SIZE): session.execute(insert_sql, rows[start : start + BATCH_SIZE]) session.commit() return len(rows) def sync_vertical_categories(partition_dt: str | None = None) -> dict[str, int | str]: """同步指定分区(默认上海时区昨天)的基本分类增量与效果全量。""" _ensure_tables() bizdate = partition_dt or _yesterday_partition_dt() base_rows, effect_rows = _fetch_from_odps(bizdate) existing_ids = _load_existing_category_ids([str(row["category_id"]) for row in base_rows]) incremental_rows = [row for row in base_rows if str(row["category_id"]) not in existing_ids] inserted_base_count = _insert_base_categories(incremental_rows) inserted_effect_count = _replace_effect_partition(bizdate, effect_rows) return { "partition_dt": bizdate, "base_fetched": len(base_rows), "base_inserted": inserted_base_count, "effect_inserted": inserted_effect_count, }