demand_pool_sync.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. import json
  2. import hashlib
  3. import re
  4. from datetime import datetime
  5. from decimal import Decimal, ROUND_HALF_UP
  6. from zoneinfo import ZoneInfo
  7. from sqlalchemy import text
  8. from app.core.config import settings
  9. from app.db.mysql import SessionLocal
  10. from app.odps.client import get_odps_client
  11. IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
  12. BATCH_SIZE = 500
  13. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  14. # 与 MySQL `multi_demand_pool_di`.`type` VARCHAR(32) 对齐
  15. _SECONDARY_TYPE_MAX_LEN = 32
  16. # 与 MySQL `multi_demand_pool_di`.`demand_name` VARCHAR(256) 对齐(次源为 merge_leve2:demand)
  17. _SECONDARY_DEMAND_NAME_MAX_LEN = 256
  18. def _safe_identifier(name: str) -> str:
  19. if not IDENTIFIER_RE.match(name):
  20. raise ValueError(f"invalid sql identifier: {name}")
  21. return name
  22. def _serialize_video_list(value: object) -> str | None:
  23. if value is None:
  24. return None
  25. if isinstance(value, list):
  26. return json.dumps(value, ensure_ascii=False)
  27. return str(value)
  28. def _normalize_secondary_weight(value: object) -> float | None:
  29. if value is None:
  30. return None
  31. decimal_value = Decimal(str(value)).quantize(
  32. Decimal("0.000001"),
  33. rounding=ROUND_HALF_UP,
  34. )
  35. return float(decimal_value)
  36. def _type_from_extend(value: object) -> str | None:
  37. """从 dwd_demand_pool_di.extend JSON 中解析 type 字段。"""
  38. if value is None:
  39. return None
  40. if isinstance(value, dict):
  41. parsed: object = value
  42. else:
  43. raw = str(value).strip()
  44. if not raw:
  45. return None
  46. try:
  47. parsed = json.loads(raw)
  48. except json.JSONDecodeError:
  49. return None
  50. if not isinstance(parsed, dict):
  51. return None
  52. nested = parsed.get("type")
  53. if nested is None:
  54. return None
  55. text_value = str(nested).strip()
  56. if not text_value:
  57. return None
  58. if len(text_value) > _SECONDARY_TYPE_MAX_LEN:
  59. return text_value[:_SECONDARY_TYPE_MAX_LEN]
  60. return text_value
  61. def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[str, object]]:
  62. source_table = _safe_identifier(settings.demand_pool_source_table)
  63. sql = f"""
  64. SELECT
  65. strategy,
  66. demand_id,
  67. demand_name,
  68. weight,
  69. `type`,
  70. video_count,
  71. video_list,
  72. ext_info
  73. FROM {source_table}
  74. WHERE dt = '{partition_dt}'
  75. """
  76. odps_client = get_odps_client()
  77. instance = odps_client.execute_sql(sql)
  78. dedup_rows: dict[str, dict[str, object]] = {}
  79. with instance.open_reader(tunnel=True) as reader:
  80. for record in reader:
  81. demand_id = str(record["demand_id"] or "").strip()
  82. if not demand_id:
  83. continue
  84. dedup_rows[demand_id] = {
  85. "strategy": record["strategy"],
  86. "demand_id": demand_id,
  87. "demand_name": record["demand_name"],
  88. "weight": record["weight"],
  89. "demand_type": record["type"],
  90. "video_count": record["video_count"],
  91. "video_list": _serialize_video_list(record["video_list"]),
  92. "ext_info": record["ext_info"],
  93. "dt": partition_dt,
  94. }
  95. return list(dedup_rows.values())
  96. def _build_secondary_demand_id(demand_name: str, partition_dt: str) -> str:
  97. raw_value = f"{settings.demand_pool_secondary_strategy}{demand_name}{partition_dt}"
  98. return hashlib.md5(raw_value.encode("utf-8")).hexdigest()
  99. def _secondary_demand_display_name(merge_leve2: object, demand: str) -> str:
  100. """次源 demand_name:`merge_leve2:demand`;merge 为空则退化为仅 demand。"""
  101. part = demand.strip()
  102. if not part:
  103. return ""
  104. merge_s = str(merge_leve2 or "").strip()
  105. if merge_s:
  106. combined = f"{merge_s}:{part}"
  107. else:
  108. combined = part
  109. if len(combined) > _SECONDARY_DEMAND_NAME_MAX_LEN:
  110. return combined[:_SECONDARY_DEMAND_NAME_MAX_LEN]
  111. return combined
  112. def _fetch_partition_rows_from_secondary_source(partition_dt: str) -> list[dict[str, object]]:
  113. source_table = _safe_identifier(settings.demand_pool_secondary_source_table)
  114. sql = f"""
  115. SELECT
  116. `merge_leve2`,
  117. demand,
  118. score,
  119. `extend`
  120. FROM {source_table}
  121. WHERE dt = '{partition_dt}'
  122. """
  123. odps_client = get_odps_client()
  124. instance = odps_client.execute_sql(sql)
  125. dedup_rows: dict[str, dict[str, object]] = {}
  126. with instance.open_reader(tunnel=True) as reader:
  127. for record in reader:
  128. demand_raw = str(record["demand"] or "").strip()
  129. if not demand_raw:
  130. continue
  131. demand_name = _secondary_demand_display_name(
  132. record["merge_leve2"],
  133. demand_raw,
  134. )
  135. if not demand_name:
  136. continue
  137. demand_id = _build_secondary_demand_id(demand_name, partition_dt)
  138. dedup_rows[demand_id] = {
  139. "strategy": settings.demand_pool_secondary_strategy,
  140. "demand_id": demand_id,
  141. "demand_name": demand_name,
  142. "weight": _normalize_secondary_weight(record["score"]),
  143. "demand_type": _type_from_extend(record["extend"]),
  144. "video_count": None,
  145. "video_list": None,
  146. "ext_info": settings.demand_pool_secondary_default_ext_info,
  147. "dt": partition_dt,
  148. }
  149. return list(dedup_rows.values())
  150. def _ensure_target_table() -> None:
  151. target_table = _safe_identifier(settings.demand_pool_target_table)
  152. create_sql = f"""
  153. CREATE TABLE IF NOT EXISTS {target_table}
  154. (
  155. id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
  156. strategy VARCHAR(64) NULL COMMENT '策略',
  157. demand_id VARCHAR(64) NULL COMMENT '需求id',
  158. demand_name VARCHAR(256) NULL COMMENT '需求',
  159. weight DOUBLE NULL COMMENT '权重',
  160. `type` VARCHAR(32) NULL COMMENT '需求类型',
  161. video_count BIGINT NULL COMMENT '视频数量',
  162. video_list TEXT NULL COMMENT '视频列表',
  163. ext_info TEXT NULL COMMENT '扩展字段',
  164. dt VARCHAR(32) NULL COMMENT '分区日期',
  165. create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  166. update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  167. UNIQUE KEY uniq_demand_id (demand_id)
  168. )
  169. """
  170. with SessionLocal() as session:
  171. session.execute(text(create_sql))
  172. session.commit()
  173. def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int:
  174. if not rows:
  175. return 0
  176. target_table = _safe_identifier(settings.demand_pool_target_table)
  177. upsert_sql = text(
  178. f"""
  179. INSERT INTO {target_table}
  180. (
  181. strategy,
  182. demand_id,
  183. demand_name,
  184. weight,
  185. `type`,
  186. video_count,
  187. video_list,
  188. ext_info,
  189. dt
  190. )
  191. VALUES
  192. (
  193. :strategy,
  194. :demand_id,
  195. :demand_name,
  196. :weight,
  197. :demand_type,
  198. :video_count,
  199. :video_list,
  200. :ext_info,
  201. :dt
  202. )
  203. ON DUPLICATE KEY UPDATE
  204. strategy = VALUES(strategy),
  205. demand_name = VALUES(demand_name),
  206. weight = VALUES(weight),
  207. `type` = VALUES(`type`),
  208. video_count = VALUES(video_count),
  209. video_list = VALUES(video_list),
  210. ext_info = VALUES(ext_info),
  211. dt = VALUES(dt),
  212. update_time = IF(
  213. NOT (
  214. strategy <=> VALUES(strategy)
  215. AND demand_name <=> VALUES(demand_name)
  216. AND weight <=> VALUES(weight)
  217. AND `type` <=> VALUES(`type`)
  218. AND video_count <=> VALUES(video_count)
  219. AND video_list <=> VALUES(video_list)
  220. AND ext_info <=> VALUES(ext_info)
  221. AND dt <=> VALUES(dt)
  222. ),
  223. CURRENT_TIMESTAMP,
  224. update_time
  225. )
  226. """
  227. )
  228. with SessionLocal() as session:
  229. for start in range(0, len(rows), BATCH_SIZE):
  230. session.execute(upsert_sql, rows[start : start + BATCH_SIZE])
  231. session.commit()
  232. return len(rows)
  233. def sync_partition(partition_dt: str) -> int:
  234. merged_rows: dict[str, dict[str, object]] = {}
  235. for row in _fetch_partition_rows_from_primary_source(partition_dt):
  236. merged_rows[str(row["demand_id"])] = row
  237. for row in _fetch_partition_rows_from_secondary_source(partition_dt):
  238. merged_rows[str(row["demand_id"])] = row
  239. return _upsert_rows_by_demand_id(list(merged_rows.values()))
  240. def run_full_sync(partitions: list[str] | None = None) -> dict[str, int]:
  241. _ensure_target_table()
  242. partition_list = partitions or settings.demand_pool_initial_partition_list
  243. result: dict[str, int] = {}
  244. for partition in partition_list:
  245. result[partition] = sync_partition(partition)
  246. return result
  247. def run_today_incremental_sync() -> dict[str, int]:
  248. _ensure_target_table()
  249. partition_dt = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
  250. return {partition_dt: sync_partition(partition_dt)}