vertical_category_sync.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """垂直领域分类:从 ODPS 同步基本分类(增量)与效果数据(按日全量)。"""
  2. import re
  3. from datetime import datetime, timedelta
  4. from decimal import Decimal
  5. from zoneinfo import ZoneInfo
  6. from sqlalchemy import bindparam, text
  7. from app.core.config import settings
  8. from app.db.mysql import SessionLocal
  9. from app.odps.client import get_odps_client
  10. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  11. BATCH_SIZE = 500
  12. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  13. _T_ELEMENT_ALL_CATEGORY_CTE = """
  14. WITH t_element AS
  15. (
  16. SELECT t1.post_id AS vid
  17. ,t1.global_element_id AS element_id
  18. ,t1.element_name AS 元素名称
  19. ,t4.stable_id AS stable_id
  20. ,t4.parent_stable_id AS parent_stable_id
  21. ,t4.name AS 分类名称
  22. ,t4.classified_as AS 归类
  23. ,t4.source_type AS 维度
  24. ,t4.description AS 分类说明
  25. ,t4.level AS 分类层级
  26. ,t4.path AS 分类路径
  27. ,t1.element_type AS 元素维度
  28. ,t6.topic_point_type AS 点类型
  29. ,t2.contribution AS 贡献度
  30. ,t2.share_will AS 分享意愿度
  31. ,t2.consume_will AS 消费意愿度
  32. ,t2.click_will AS 点击意愿度
  33. FROM loghubods.element_classification_mapping t1
  34. LEFT JOIN loghubods.dwd_post_word_contribution_di t2
  35. ON t1.post_id = t2.post_id
  36. AND t1.element_name = t2.word
  37. AND t2.dt = '{bizdate}'
  38. LEFT JOIN loghubods.global_element t3
  39. ON t1.global_element_id = t3.id
  40. AND t3.dt = '{bizdate}'
  41. LEFT JOIN loghubods.global_category t4
  42. ON t1.global_category_stable_id = t4.stable_id
  43. AND t4.dt = '{bizdate}'
  44. LEFT JOIN loghubods.post_decode_topic_point_element t5
  45. ON t1.source_element_id = t5.id
  46. AND t1.post_id = t5.post_id
  47. AND t5.dt = '{bizdate}'
  48. LEFT JOIN loghubods.post_decode_topic_point t6
  49. ON t5.topic_point_id = t6.id
  50. AND t6.dt = '{bizdate}'
  51. WHERE t1.dt = '{bizdate}'
  52. AND LENGTH(CAST(t1.post_id AS STRING)) <= 10
  53. AND t3.retired_at_execution_id IS NULL
  54. AND t4.retired_at_execution_id IS NULL
  55. AND t5.id IS NOT NULL
  56. AND t1.element_type = '实质'
  57. AND t4.classified_as = '垂直领域细分'
  58. )
  59. ,t_element_all_category AS
  60. (
  61. SELECT e.vid
  62. ,e.贡献度
  63. ,e.分享意愿度
  64. ,e.消费意愿度
  65. ,e.点击意愿度
  66. ,gc_ancestor.stable_id
  67. ,gc_ancestor.parent_stable_id
  68. ,gc_ancestor.level
  69. ,gc_ancestor.name
  70. ,gc_ancestor.classified_as
  71. ,gc_ancestor.source_type
  72. ,gc_ancestor.description
  73. FROM t_element e
  74. JOIN loghubods.global_category gc_self
  75. ON e.stable_id = gc_self.stable_id
  76. AND gc_self.dt = '{bizdate}'
  77. AND gc_self.retired_at_execution_id IS NULL
  78. JOIN loghubods.global_category gc_ancestor
  79. ON gc_ancestor.dt = '{bizdate}'
  80. AND gc_ancestor.retired_at_execution_id IS NULL
  81. AND gc_ancestor.dt = gc_self.dt
  82. WHERE INSTR(gc_self.path, CONCAT(gc_ancestor.path, '/')) = 1
  83. UNION ALL
  84. SELECT vid
  85. ,贡献度
  86. ,分享意愿度
  87. ,消费意愿度
  88. ,点击意愿度
  89. ,stable_id
  90. ,parent_stable_id
  91. ,分类层级 AS level
  92. ,分类名称 AS name
  93. ,归类 AS classified_as
  94. ,维度 AS source_type
  95. ,分类说明 AS description
  96. FROM t_element
  97. WHERE stable_id IS NOT NULL
  98. )
  99. """
  100. def _safe_identifier(name: str) -> str:
  101. if not IDENTIFIER_RE.match(name):
  102. raise ValueError(f"invalid sql identifier: {name}")
  103. return name
  104. def _normalize_scalar(value: object) -> object:
  105. if isinstance(value, Decimal):
  106. return float(value)
  107. return value
  108. def _normalize_category_id(value: object) -> str:
  109. text_value = str(value or "").strip()
  110. if not text_value:
  111. raise ValueError("category_id 不能为空")
  112. return text_value
  113. def _yesterday_partition_dt(reference: datetime | None = None) -> str:
  114. now = reference or datetime.now(SHANGHAI_TZ)
  115. return (now.date() - timedelta(days=1)).strftime("%Y%m%d")
  116. def _build_sync_sql(bizdate: str) -> str:
  117. return (
  118. _T_ELEMENT_ALL_CATEGORY_CTE.format(bizdate=bizdate)
  119. + """
  120. SELECT CAST(t1.stable_id AS STRING) AS category_id
  121. ,CAST(t1.parent_stable_id AS STRING) AS parent_stable_id
  122. ,t1.name AS category_name
  123. ,t1.level AS category_level
  124. ,t1.source_type AS dimension
  125. ,t1.classified_as AS classified_as
  126. ,COUNT(DISTINCT t1.vid) AS vid_count
  127. ,COALESCE(ROUND(
  128. SUM(t2.分发回流uv * CASE WHEN t1.贡献度 >= 0.8 THEN t1.贡献度 ELSE 0 END)
  129. / NULLIF(SUM(t2.分发曝光pv), 0)
  130. , 4), 0) AS rov_score
  131. ,COALESCE(ROUND(
  132. SUM(t2.分发分享pv * CASE WHEN t1.分享意愿度 >= 0.8 THEN t1.分享意愿度 ELSE 0 END)
  133. / NULLIF(SUM(t2.分发曝光pv), 0)
  134. , 4), 0) AS str_score
  135. ,COALESCE(ROUND(
  136. SUM(t2.分发回流uv * CASE WHEN t1.点击意愿度 >= 0.8 THEN t1.点击意愿度 ELSE 0 END)
  137. / NULLIF(SUM(t2.分发分享pv), 0)
  138. , 4), 0) AS ros_score
  139. FROM t_element_all_category t1
  140. LEFT JOIN loghubods.video_multi_strategy_effect t2
  141. ON t1.vid = t2.vid
  142. AND t2.dt = '{bizdate}'
  143. AND t2.strategy = '当天'
  144. LEFT JOIN loghubods.global_category t3
  145. ON t1.stable_id = t3.stable_id
  146. AND t3.dt = '{bizdate}'
  147. AND t3.retired_at_execution_id IS NULL
  148. GROUP BY t1.stable_id
  149. ,t1.parent_stable_id
  150. ,t1.name
  151. ,t1.level
  152. ,t1.source_type
  153. ,t1.classified_as
  154. """
  155. ).format(bizdate=bizdate)
  156. def _fetch_from_odps(bizdate: str) -> tuple[list[dict[str, object]], list[dict[str, object]]]:
  157. sql = _build_sync_sql(bizdate)
  158. odps_client = get_odps_client()
  159. instance = odps_client.execute_sql(sql)
  160. base_rows: dict[str, dict[str, object]] = {}
  161. effect_rows: list[dict[str, object]] = []
  162. with instance.open_reader(tunnel=True) as reader:
  163. for record in reader:
  164. category_id = _normalize_category_id(record["category_id"])
  165. base_rows[category_id] = {
  166. "category_id": category_id,
  167. "parent_stable_id": str(record["parent_stable_id"] or "").strip() or None,
  168. "category_name": record["category_name"],
  169. "category_level": record["category_level"],
  170. "dimension": record["dimension"],
  171. "classified_as": record["classified_as"],
  172. }
  173. effect_rows.append(
  174. {
  175. "category_id": category_id,
  176. "vid_count": record["vid_count"],
  177. "rov_score": _normalize_scalar(record["rov_score"]),
  178. "str_score": _normalize_scalar(record["str_score"]),
  179. "ros_score": _normalize_scalar(record["ros_score"]),
  180. "dt": bizdate,
  181. }
  182. )
  183. return list(base_rows.values()), effect_rows
  184. def _ensure_tables() -> None:
  185. base_table = _safe_identifier(settings.vertical_category_base_table)
  186. effect_table = _safe_identifier(settings.vertical_category_effect_table)
  187. create_base_sql = f"""
  188. CREATE TABLE IF NOT EXISTS {base_table}
  189. (
  190. id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
  191. category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id',
  192. parent_stable_id VARCHAR(64) NULL COMMENT '父分类stable_id',
  193. category_name VARCHAR(256) NULL COMMENT '分类名称',
  194. category_level INT NULL COMMENT '分类层级',
  195. dimension VARCHAR(32) NULL COMMENT '维度',
  196. classified_as VARCHAR(64) NULL COMMENT '归类',
  197. create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  198. update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  199. UNIQUE KEY uniq_category_id (category_id)
  200. )
  201. """
  202. create_effect_sql = f"""
  203. CREATE TABLE IF NOT EXISTS {effect_table}
  204. (
  205. id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
  206. category_id VARCHAR(64) NOT NULL COMMENT '分类stable_id',
  207. vid_count BIGINT NULL COMMENT '视频数量',
  208. rov_score DOUBLE NULL COMMENT 'ROV得分',
  209. str_score DOUBLE NULL COMMENT 'STR得分',
  210. ros_score DOUBLE NULL COMMENT 'ROS得分',
  211. dt VARCHAR(32) NOT NULL COMMENT '分区日期',
  212. create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  213. update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  214. UNIQUE KEY uniq_category_dt (category_id, dt)
  215. )
  216. """
  217. with SessionLocal() as session:
  218. session.execute(text(create_base_sql))
  219. session.execute(text(create_effect_sql))
  220. session.commit()
  221. def _load_existing_category_ids(category_ids: list[str]) -> set[str]:
  222. if not category_ids:
  223. return set()
  224. base_table = _safe_identifier(settings.vertical_category_base_table)
  225. existing: set[str] = set()
  226. query_sql = (
  227. text(
  228. f"""
  229. SELECT category_id
  230. FROM {base_table}
  231. WHERE category_id IN :category_ids
  232. """
  233. )
  234. .bindparams(bindparam("category_ids", expanding=True))
  235. )
  236. with SessionLocal() as session:
  237. for start in range(0, len(category_ids), BATCH_SIZE):
  238. batch = category_ids[start : start + BATCH_SIZE]
  239. rows = session.execute(query_sql, {"category_ids": tuple(batch)}).all()
  240. existing.update(str(row[0]) for row in rows)
  241. return existing
  242. def _insert_base_categories(rows: list[dict[str, object]]) -> int:
  243. if not rows:
  244. return 0
  245. base_table = _safe_identifier(settings.vertical_category_base_table)
  246. insert_sql = text(
  247. f"""
  248. INSERT INTO {base_table}
  249. (
  250. category_id,
  251. parent_stable_id,
  252. category_name,
  253. category_level,
  254. dimension,
  255. classified_as
  256. )
  257. VALUES
  258. (
  259. :category_id,
  260. :parent_stable_id,
  261. :category_name,
  262. :category_level,
  263. :dimension,
  264. :classified_as
  265. )
  266. """
  267. )
  268. with SessionLocal() as session:
  269. for start in range(0, len(rows), BATCH_SIZE):
  270. session.execute(insert_sql, rows[start : start + BATCH_SIZE])
  271. session.commit()
  272. return len(rows)
  273. def _replace_effect_partition(partition_dt: str, rows: list[dict[str, object]]) -> int:
  274. effect_table = _safe_identifier(settings.vertical_category_effect_table)
  275. delete_sql = text(f"DELETE FROM {effect_table} WHERE dt = :dt")
  276. insert_sql = text(
  277. f"""
  278. INSERT INTO {effect_table}
  279. (
  280. category_id,
  281. vid_count,
  282. rov_score,
  283. str_score,
  284. ros_score,
  285. dt
  286. )
  287. VALUES
  288. (
  289. :category_id,
  290. :vid_count,
  291. :rov_score,
  292. :str_score,
  293. :ros_score,
  294. :dt
  295. )
  296. """
  297. )
  298. with SessionLocal() as session:
  299. session.execute(delete_sql, {"dt": partition_dt})
  300. for start in range(0, len(rows), BATCH_SIZE):
  301. session.execute(insert_sql, rows[start : start + BATCH_SIZE])
  302. session.commit()
  303. return len(rows)
  304. def sync_vertical_categories(partition_dt: str | None = None) -> dict[str, int | str]:
  305. """同步指定分区(默认上海时区昨天)的基本分类增量与效果全量。"""
  306. _ensure_tables()
  307. bizdate = partition_dt or _yesterday_partition_dt()
  308. base_rows, effect_rows = _fetch_from_odps(bizdate)
  309. existing_ids = _load_existing_category_ids([str(row["category_id"]) for row in base_rows])
  310. incremental_rows = [row for row in base_rows if str(row["category_id"]) not in existing_ids]
  311. inserted_base_count = _insert_base_categories(incremental_rows)
  312. inserted_effect_count = _replace_effect_partition(bizdate, effect_rows)
  313. return {
  314. "partition_dt": bizdate,
  315. "base_fetched": len(base_rows),
  316. "base_inserted": inserted_base_count,
  317. "effect_inserted": inserted_effect_count,
  318. }