substance_element_sync.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. """垂直领域实质元素:从 ODPS 同步基本元素(增量)与效果数据(按日全量)。"""
  2. import json
  3. import re
  4. from datetime import datetime, timedelta
  5. from decimal import Decimal
  6. from zoneinfo import ZoneInfo
  7. from sqlalchemy import bindparam, text
  8. from app.core.config import settings
  9. from app.db.mysql import SessionLocal
  10. from app.odps.client import get_odps_client
  11. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  12. BATCH_SIZE = 500
  13. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  14. _T_ELEMENT_CTE = """
  15. WITH t_element AS
  16. (
  17. SELECT t1.post_id AS vid
  18. ,t1.global_element_id AS element_id
  19. ,t1.element_name AS 元素名称
  20. ,t1.element_type AS 维度
  21. ,t4.classified_as AS 归类
  22. ,t4.name AS 分类名称
  23. ,t4.stable_id AS stable_id
  24. ,t4.description AS 分类说明
  25. ,t4.level AS 分类层级
  26. ,t4.path AS 分类路径
  27. ,t1.element_type AS 元素维度
  28. ,t6.topic_point_type AS 点类型
  29. ,t2.contribution AS 贡献度
  30. ,t2.share_will AS 分享意愿度
  31. ,t2.consume_will AS 消费意愿度
  32. ,t2.click_will AS 点击意愿度
  33. FROM loghubods.element_classification_mapping t1
  34. LEFT JOIN loghubods.dwd_post_word_contribution_di t2
  35. ON t1.post_id = t2.post_id
  36. AND t1.element_name = t2.word
  37. AND t2.dt = '{bizdate}'
  38. LEFT JOIN loghubods.global_element t3
  39. ON t1.global_element_id = t3.id
  40. AND t3.dt = '{bizdate}'
  41. LEFT JOIN loghubods.global_category t4
  42. ON t1.global_category_stable_id = t4.stable_id
  43. AND t4.dt = '{bizdate}'
  44. LEFT JOIN loghubods.post_decode_topic_point_element t5
  45. ON t1.source_element_id = t5.id
  46. AND t1.post_id = t5.post_id
  47. AND t5.dt = '{bizdate}'
  48. LEFT JOIN loghubods.post_decode_topic_point t6
  49. ON t5.topic_point_id = t6.id
  50. AND t6.dt = '{bizdate}'
  51. WHERE t1.dt = '{bizdate}'
  52. AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
  53. AND t3.retired_at_execution_id IS NULL
  54. AND t4.retired_at_execution_id IS NULL
  55. AND t5.id IS NOT NULL
  56. AND t1.element_type = '实质'
  57. AND t4.classified_as = '垂直领域细分'
  58. )
  59. """
  60. _T_ELEMENT_BASE_CTE = """
  61. WITH t_element AS
  62. (
  63. SELECT t1.global_element_id AS element_id
  64. ,t1.element_name AS 元素名称
  65. ,t1.element_type AS 维度
  66. ,t4.classified_as AS 归类
  67. ,t4.stable_id AS stable_id
  68. FROM loghubods.element_classification_mapping t1
  69. LEFT JOIN loghubods.global_element t3
  70. ON t1.global_element_id = t3.id
  71. AND t3.dt = '{bizdate}'
  72. LEFT JOIN loghubods.global_category t4
  73. ON t1.global_category_stable_id = t4.stable_id
  74. AND t4.dt = '{bizdate}'
  75. LEFT JOIN loghubods.post_decode_topic_point_element t5
  76. ON t1.source_element_id = t5.id
  77. AND t1.post_id = t5.post_id
  78. AND t5.dt = '{bizdate}'
  79. WHERE t1.dt = '{bizdate}'
  80. AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
  81. AND t3.retired_at_execution_id IS NULL
  82. AND t4.retired_at_execution_id IS NULL
  83. AND t5.id IS NOT NULL
  84. AND t1.element_type = '实质'
  85. AND t4.classified_as = '垂直领域细分'
  86. )
  87. """
  88. def _safe_identifier(name: str) -> str:
  89. if not IDENTIFIER_RE.match(name):
  90. raise ValueError(f"invalid sql identifier: {name}")
  91. return name
  92. def _serialize_vid_list(value: object) -> str | None:
  93. if value is None:
  94. return None
  95. if isinstance(value, list):
  96. return json.dumps(value, ensure_ascii=False)
  97. return str(value)
  98. def _normalize_scalar(value: object) -> object:
  99. if isinstance(value, Decimal):
  100. return float(value)
  101. return value
  102. def _normalize_element_id(value: object) -> str:
  103. text_value = str(value or "").strip()
  104. if not text_value:
  105. raise ValueError("element_id 不能为空")
  106. return text_value
  107. def _yesterday_partition_dt(reference: datetime | None = None) -> str:
  108. now = reference or datetime.now(SHANGHAI_TZ)
  109. return (now.date() - timedelta(days=1)).strftime("%Y%m%d")
  110. def _build_sync_sql(bizdate: str) -> str:
  111. return (
  112. _T_ELEMENT_CTE.format(bizdate=bizdate)
  113. + """
  114. SELECT CAST(t1.element_id AS STRING) AS element_id
  115. ,t1.元素名称 AS element_name
  116. ,t1.维度 AS dimension
  117. ,t1.归类 AS classified_as
  118. ,CAST(t1.stable_id AS STRING) AS stable_id
  119. ,COUNT(DISTINCT t1.vid) AS vid_count
  120. ,COLLECT_SET(t1.vid) AS vid_list
  121. ,COALESCE(ROUND(
  122. SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END)
  123. / NULLIF(SUM(t2.分发曝光pv), 0)
  124. , 4), 0) AS rov_score
  125. ,COALESCE(ROUND(
  126. SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END)
  127. / NULLIF(SUM(t2.分发曝光pv), 0)
  128. , 4), 0) AS str_score
  129. ,COALESCE(ROUND(
  130. SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END)
  131. / NULLIF(SUM(t2.分发分享pv), 0)
  132. , 4), 0) AS ros_score
  133. FROM t_element t1
  134. LEFT JOIN loghubods.video_multi_strategy_effect t2
  135. ON t1.vid = t2.vid
  136. AND t2.dt = '{bizdate}'
  137. AND t2.strategy = '当天'
  138. GROUP BY t1.element_id
  139. ,t1.元素名称
  140. ,t1.维度
  141. ,t1.归类
  142. ,t1.stable_id
  143. """
  144. ).format(bizdate=bizdate)
  145. def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]:
  146. sql = _build_sync_sql(bizdate)
  147. odps_client = get_odps_client()
  148. instance = odps_client.execute_sql(sql)
  149. base_rows: dict[str, dict[str, object]] = {}
  150. effect_rows: list[dict[str, object]] = []
  151. with instance.open_reader(tunnel=True) as reader:
  152. for record in reader:
  153. element_id = _normalize_element_id(record["element_id"])
  154. base_rows[element_id] = {
  155. "element_id": element_id,
  156. "stable_id": str(record["stable_id"] or "").strip() or None,
  157. "element_name": record["element_name"],
  158. "dimension": record["dimension"],
  159. "classified_as": record["classified_as"],
  160. }
  161. effect_rows.append(
  162. {
  163. "element_id": element_id,
  164. "vid_count": record["vid_count"],
  165. "vid_list": _serialize_vid_list(record["vid_list"]),
  166. "rov_score": _normalize_scalar(record["rov_score"]),
  167. "str_score": _normalize_scalar(record["str_score"]),
  168. "ros_score": _normalize_scalar(record["ros_score"]),
  169. "dt": bizdate,
  170. }
  171. )
  172. return list(base_rows.values()), effect_rows
  173. def _build_base_fields_sql(bizdate: str) -> str:
  174. return (
  175. _T_ELEMENT_BASE_CTE.format(bizdate=bizdate)
  176. + """
  177. SELECT DISTINCT CAST(t1.element_id AS STRING) AS element_id
  178. ,CAST(t1.stable_id AS STRING) AS stable_id
  179. ,t1.元素名称 AS element_name
  180. ,t1.维度 AS dimension
  181. ,t1.归类 AS classified_as
  182. FROM t_element t1
  183. """
  184. )
  185. def _fetch_base_fields_from_odps(bizdate: str) -> list[dict[str, object]]:
  186. print(f"[update] querying ODPS base fields, dt={bizdate} ...")
  187. sql = _build_base_fields_sql(bizdate)
  188. odps_client = get_odps_client()
  189. instance = odps_client.execute_sql(sql)
  190. rows: dict[str, dict[str, object]] = {}
  191. with instance.open_reader(tunnel=True) as reader:
  192. for record in reader:
  193. element_id = _normalize_element_id(record["element_id"])
  194. rows[element_id] = {
  195. "element_id": element_id,
  196. "stable_id": str(record["stable_id"] or "").strip() or None,
  197. "element_name": record["element_name"],
  198. "dimension": record["dimension"],
  199. "classified_as": record["classified_as"],
  200. }
  201. print(f"[update] ODPS done, fetched {len(rows)} rows")
  202. return list(rows.values())
  203. def _update_base_stable_id(rows: list[dict[str, object]]) -> int:
  204. if not rows:
  205. return 0
  206. base_table = _safe_identifier(settings.substance_element_base_table)
  207. update_sql = text(
  208. f"""
  209. UPDATE {base_table}
  210. SET stable_id = :stable_id
  211. WHERE element_id = :element_id
  212. AND NOT (stable_id <=> :stable_id)
  213. """
  214. )
  215. updated = 0
  216. with SessionLocal() as session:
  217. for start in range(0, len(rows), BATCH_SIZE):
  218. batch = rows[start : start + BATCH_SIZE]
  219. result = session.execute(update_sql, batch)
  220. updated += int(result.rowcount or 0)
  221. session.commit()
  222. return updated
  223. def truncate_substance_element_base() -> None:
  224. base_table = _safe_identifier(settings.substance_element_base_table)
  225. with SessionLocal() as session:
  226. session.execute(text(f"TRUNCATE TABLE {base_table}"))
  227. session.commit()
  228. print(f"[sync] truncated {base_table}")
  229. def sync_substance_element_base_only(partition_dt: str | None = None) -> dict[str, int | str]:
  230. """仅从 ODPS 全量写入 substance_element_base,不重写 effect_di 表。"""
  231. _ensure_tables()
  232. bizdate = partition_dt or _yesterday_partition_dt()
  233. base_rows = _fetch_base_fields_from_odps(bizdate)
  234. print(f"[sync] inserting {len(base_rows)} rows into base table ...")
  235. inserted_count = _insert_base_elements(base_rows)
  236. return {
  237. "partition_dt": bizdate,
  238. "base_fetched": len(base_rows),
  239. "base_inserted": inserted_count,
  240. }
  241. def update_substance_element_base_fields(partition_dt: str | None = None) -> dict[str, int | str]:
  242. """仅回填 substance_element_base.stable_id,不重写 effect_di 表。"""
  243. bizdate = partition_dt or _yesterday_partition_dt()
  244. base_rows = _fetch_base_fields_from_odps(bizdate)
  245. print(f"[update] matching existing rows in MySQL ...")
  246. existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows])
  247. update_rows = [row for row in base_rows if str(row["element_id"]) in existing_ids]
  248. print(f"[update] updating stable_id for {len(update_rows)} rows ...")
  249. updated_count = _update_base_stable_id(update_rows)
  250. return {
  251. "partition_dt": bizdate,
  252. "base_fetched": len(base_rows),
  253. "base_matched": len(update_rows),
  254. "base_updated": updated_count,
  255. }
  256. def _ensure_tables() -> None:
  257. base_table = _safe_identifier(settings.substance_element_base_table)
  258. effect_table = _safe_identifier(settings.substance_element_effect_table)
  259. create_base_sql = f"""
  260. CREATE TABLE IF NOT EXISTS {base_table}
  261. (
  262. id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
  263. element_id VARCHAR(64) NOT NULL COMMENT '元素id',
  264. stable_id VARCHAR(64) NULL COMMENT '分类stable_id',
  265. element_name VARCHAR(256) NULL COMMENT '元素名称',
  266. dimension VARCHAR(32) NULL COMMENT '元素维度',
  267. classified_as VARCHAR(64) NULL COMMENT '归类',
  268. create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  269. update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  270. UNIQUE KEY uniq_element_id (element_id)
  271. )
  272. """
  273. create_effect_sql = f"""
  274. CREATE TABLE IF NOT EXISTS {effect_table}
  275. (
  276. id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
  277. element_id VARCHAR(64) NOT NULL COMMENT '元素id',
  278. vid_count BIGINT NULL COMMENT '视频数量',
  279. vid_list LONGTEXT NULL COMMENT '视频列表',
  280. rov_score DOUBLE NULL COMMENT 'ROV得分',
  281. str_score DOUBLE NULL COMMENT 'STR得分',
  282. ros_score DOUBLE NULL COMMENT 'ROS得分',
  283. dt VARCHAR(32) NOT NULL COMMENT '分区日期',
  284. create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  285. update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  286. UNIQUE KEY uniq_element_dt (element_id, dt)
  287. )
  288. """
  289. alter_base_sql = f"""
  290. ALTER TABLE {base_table}
  291. ADD COLUMN stable_id VARCHAR(64) NULL COMMENT '分类stable_id' AFTER element_id
  292. """
  293. alter_effect_sql = f"""
  294. ALTER TABLE {effect_table}
  295. MODIFY COLUMN vid_list LONGTEXT NULL COMMENT '视频列表'
  296. """
  297. check_column_sql = text(
  298. """
  299. SELECT COUNT(*)
  300. FROM information_schema.COLUMNS
  301. WHERE TABLE_SCHEMA = DATABASE()
  302. AND TABLE_NAME = :table_name
  303. AND COLUMN_NAME = :column_name
  304. """
  305. )
  306. with SessionLocal() as session:
  307. session.execute(text(create_base_sql))
  308. session.execute(text(create_effect_sql))
  309. if session.execute(
  310. check_column_sql,
  311. {"table_name": settings.substance_element_base_table, "column_name": "stable_id"},
  312. ).scalar() == 0:
  313. session.execute(text(alter_base_sql))
  314. session.execute(text(alter_effect_sql))
  315. session.commit()
  316. def _load_existing_element_ids(element_ids: list[str]) -> set[str]:
  317. if not element_ids:
  318. return set()
  319. base_table = _safe_identifier(settings.substance_element_base_table)
  320. existing: set[str] = set()
  321. query_sql = (
  322. text(
  323. f"""
  324. SELECT element_id
  325. FROM {base_table}
  326. WHERE element_id IN :element_ids
  327. """
  328. )
  329. .bindparams(bindparam("element_ids", expanding=True))
  330. )
  331. with SessionLocal() as session:
  332. for start in range(0, len(element_ids), BATCH_SIZE):
  333. batch = element_ids[start : start + BATCH_SIZE]
  334. rows = session.execute(query_sql, {"element_ids": tuple(batch)}).all()
  335. existing.update(str(row[0]) for row in rows)
  336. return existing
  337. def _insert_base_elements(rows: list[dict[str, object]]) -> int:
  338. if not rows:
  339. return 0
  340. base_table = _safe_identifier(settings.substance_element_base_table)
  341. insert_sql = text(
  342. f"""
  343. INSERT INTO {base_table}
  344. (
  345. element_id,
  346. stable_id,
  347. element_name,
  348. dimension,
  349. classified_as
  350. )
  351. VALUES
  352. (
  353. :element_id,
  354. :stable_id,
  355. :element_name,
  356. :dimension,
  357. :classified_as
  358. )
  359. """
  360. )
  361. with SessionLocal() as session:
  362. for start in range(0, len(rows), BATCH_SIZE):
  363. session.execute(insert_sql, rows[start : start + BATCH_SIZE])
  364. session.commit()
  365. return len(rows)
  366. def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int:
  367. effect_table = _safe_identifier(settings.substance_element_effect_table)
  368. delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt")
  369. insert_sql = text(
  370. f"""
  371. INSERT INTO {effect_table}
  372. (
  373. element_id,
  374. vid_count,
  375. vid_list,
  376. rov_score,
  377. str_score,
  378. ros_score,
  379. dt
  380. )
  381. VALUES
  382. (
  383. :element_id,
  384. :vid_count,
  385. :vid_list,
  386. :rov_score,
  387. :str_score,
  388. :ros_score,
  389. :dt
  390. )
  391. """
  392. )
  393. with SessionLocal() as session:
  394. session.execute(delete_sql, {"dt": partition_dt})
  395. for start in range(0, len(rows), BATCH_SIZE):
  396. session.execute(insert_sql, rows[start : start + BATCH_SIZE])
  397. session.commit()
  398. return len(rows)
  399. def sync_substance_elements(partition_dt: str | None = None) -> dict[str, int | str]:
  400. """同步指定分区(默认上海时区昨天)的基本元素增量与效果全量。"""
  401. _ensure_tables()
  402. bizdate = partition_dt or _yesterday_partition_dt()
  403. base_rows, effect_rows = _fetch_from_odps(bizdate)
  404. existing_ids = _load_existing_element_ids([str(row["element_id"]) for row in base_rows])
  405. incremental_rows = [row for row in base_rows if str(row["element_id"]) not in existing_ids]
  406. inserted_base_count = _insert_base_elements(incremental_rows)
  407. inserted_effect_count = _replace_effect_partition(bizdate, effect_rows)
  408. return {
  409. "partition_dt": bizdate,
  410. "base_fetched": len(base_rows),
  411. "base_inserted": inserted_base_count,
  412. "effect_inserted": inserted_effect_count,
  413. }