repository.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _json_loads(value: Any) -> Any:
  20. if value is None:
  21. return None
  22. if isinstance(value, (dict, list)):
  23. return value
  24. if isinstance(value, (bytes, bytearray)):
  25. value = value.decode("utf-8")
  26. if isinstance(value, str):
  27. return json.loads(value)
  28. return value
  29. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  30. names: list[str] = []
  31. seen: set[str] = set()
  32. for item in demand_name_set:
  33. name = str(item).strip()
  34. if not name or name in seen:
  35. continue
  36. seen.add(name)
  37. names.append(name)
  38. return names
  39. def unique_title_key(source: str, title: str) -> str:
  40. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  41. class HotContentRepository:
  42. def __init__(self, config: MysqlConfig):
  43. if pymysql is None or DictCursor is None:
  44. raise HotContentFlowError("missing dependency: pip install pymysql")
  45. self.conn = pymysql.connect(
  46. host=config.host,
  47. port=config.port,
  48. user=config.user,
  49. password=config.password,
  50. database=config.database,
  51. charset=config.charset,
  52. autocommit=True,
  53. cursorclass=DictCursor,
  54. )
  55. def close(self) -> None:
  56. self.conn.close()
  57. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  58. key = unique_title_key(source, title)
  59. sql = """
  60. INSERT INTO hot_content_records (
  61. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  62. )
  63. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  64. ON DUPLICATE KEY UPDATE
  65. id=LAST_INSERT_ID(id),
  66. hot_rank=VALUES(hot_rank),
  67. updated_at=NOW()
  68. """
  69. with self.conn.cursor() as cursor:
  70. cursor.execute(
  71. sql,
  72. (
  73. key,
  74. source,
  75. title,
  76. rank,
  77. ExecutionStatus.HOT_SAVED,
  78. ),
  79. )
  80. record_id = int(cursor.lastrowid)
  81. cursor.execute(
  82. """
  83. SELECT
  84. id,
  85. unique_key,
  86. execution_status,
  87. article_title,
  88. article_body,
  89. article_url,
  90. decode_request_result IS NOT NULL AS has_decode_request,
  91. contribution_points_json IS NOT NULL AS has_contribution_points
  92. FROM hot_content_records
  93. WHERE id = %s
  94. """,
  95. (record_id,),
  96. )
  97. row = cursor.fetchone()
  98. if not row:
  99. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  100. return {
  101. "id": int(row["id"]),
  102. "unique_key": str(row["unique_key"]),
  103. "execution_status": int(row["execution_status"]),
  104. "article_title": row.get("article_title"),
  105. "article_body": row.get("article_body"),
  106. "article_url": row.get("article_url"),
  107. "has_decode_request": bool(row.get("has_decode_request")),
  108. "has_contribution_points": bool(row.get("has_contribution_points")),
  109. }
  110. def update_status(
  111. self,
  112. *,
  113. record_id: int,
  114. status: int,
  115. error_message: str | None = None,
  116. ) -> None:
  117. sql = """
  118. UPDATE hot_content_records
  119. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  120. WHERE id=%s
  121. """
  122. with self.conn.cursor() as cursor:
  123. cursor.execute(sql, (status, error_message, record_id))
  124. def update_article(
  125. self,
  126. *,
  127. record_id: int,
  128. article_title: str,
  129. article_body: str,
  130. url: str,
  131. ) -> None:
  132. sql = """
  133. UPDATE hot_content_records
  134. SET article_title=%s,
  135. article_body=%s,
  136. article_url=%s,
  137. execution_status=%s,
  138. error_reason=NULL,
  139. updated_at=NOW()
  140. WHERE id=%s
  141. """
  142. with self.conn.cursor() as cursor:
  143. cursor.execute(
  144. sql,
  145. (
  146. article_title,
  147. article_body,
  148. url,
  149. ExecutionStatus.CONTENT_OK,
  150. record_id,
  151. ),
  152. )
  153. def update_decode_result(
  154. self,
  155. *,
  156. record_id: int,
  157. status: int,
  158. request_json: dict[str, Any],
  159. response_json: dict[str, Any] | None,
  160. error_message: str | None = None,
  161. ) -> None:
  162. decode_request_result = {
  163. "request": request_json,
  164. "response": response_json,
  165. }
  166. sql = """
  167. UPDATE hot_content_records
  168. SET decode_request_result=%s,
  169. execution_status=%s,
  170. error_reason=%s,
  171. updated_at=NOW()
  172. WHERE id=%s
  173. """
  174. with self.conn.cursor() as cursor:
  175. cursor.execute(
  176. sql,
  177. (
  178. _json_dumps(decode_request_result),
  179. status,
  180. error_message,
  181. record_id,
  182. ),
  183. )
  184. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  185. sql = """
  186. SELECT id, unique_key
  187. FROM hot_content_records
  188. WHERE execution_status IN (%s, %s, %s)
  189. AND contribution_points_json IS NULL
  190. ORDER BY updated_at ASC, id ASC
  191. LIMIT %s
  192. """
  193. with self.conn.cursor() as cursor:
  194. cursor.execute(
  195. sql,
  196. (
  197. ExecutionStatus.DECODE_SUBMITTED,
  198. ExecutionStatus.DECODE_SUCCESS,
  199. ExecutionStatus.DECODE_PENDING,
  200. limit,
  201. ),
  202. )
  203. rows = cursor.fetchall()
  204. return [
  205. {
  206. "id": int(row["id"]),
  207. "unique_key": str(row["unique_key"]),
  208. }
  209. for row in rows
  210. ]
  211. def save_decode_result_export(
  212. self,
  213. *,
  214. record_id: int,
  215. decode_result_json: dict[str, Any],
  216. contribution_points_json: dict[str, Any],
  217. ) -> None:
  218. sql = """
  219. UPDATE hot_content_records
  220. SET decode_result_json=%s,
  221. contribution_points_json=%s,
  222. execution_status=%s,
  223. error_reason=NULL,
  224. updated_at=NOW()
  225. WHERE id=%s
  226. """
  227. with self.conn.cursor() as cursor:
  228. cursor.execute(
  229. sql,
  230. (
  231. _json_dumps(decode_result_json),
  232. _json_dumps(contribution_points_json),
  233. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  234. record_id,
  235. ),
  236. )
  237. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  238. sql = """
  239. SELECT
  240. id,
  241. cache_hour,
  242. source_table,
  243. partition_dt,
  244. demand_name_set_json,
  245. item_count,
  246. updated_at
  247. FROM demand_pool_hourly_cache
  248. WHERE cache_hour=%s
  249. LIMIT 1
  250. """
  251. with self.conn.cursor() as cursor:
  252. cursor.execute(sql, (cache_hour,))
  253. row = cursor.fetchone()
  254. if not row:
  255. return None
  256. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  257. if not isinstance(demand_name_set, list):
  258. demand_name_set = []
  259. return {
  260. "id": int(row["id"]),
  261. "cache_hour": row.get("cache_hour"),
  262. "source_table": str(row["source_table"]),
  263. "partition_dt": row.get("partition_dt"),
  264. "demand_name_set": [
  265. str(name).strip()
  266. for name in demand_name_set
  267. if str(name).strip()
  268. ],
  269. "item_count": int(row.get("item_count") or 0),
  270. "updated_at": row.get("updated_at"),
  271. }
  272. def save_demand_cache_set(
  273. self,
  274. *,
  275. cache_hour: datetime,
  276. source_table: str,
  277. partition_dt: str | None,
  278. excluded_strategy: str,
  279. top_n: int,
  280. demand_name_set: list[str],
  281. ) -> int:
  282. sql = """
  283. INSERT INTO demand_pool_hourly_cache (
  284. cache_hour,
  285. source_table,
  286. partition_dt,
  287. excluded_strategy,
  288. top_n,
  289. demand_name_set_json,
  290. item_count,
  291. created_at,
  292. updated_at
  293. )
  294. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  295. ON DUPLICATE KEY UPDATE
  296. id=LAST_INSERT_ID(id),
  297. source_table=VALUES(source_table),
  298. partition_dt=VALUES(partition_dt),
  299. excluded_strategy=VALUES(excluded_strategy),
  300. top_n=VALUES(top_n),
  301. demand_name_set_json=VALUES(demand_name_set_json),
  302. item_count=VALUES(item_count),
  303. updated_at=NOW()
  304. """
  305. normalized_names = _normalize_demand_names(demand_name_set)
  306. with self.conn.cursor() as cursor:
  307. cursor.execute(
  308. sql,
  309. (
  310. cache_hour,
  311. source_table,
  312. partition_dt,
  313. excluded_strategy,
  314. top_n,
  315. _json_dumps(normalized_names),
  316. len(normalized_names),
  317. ),
  318. )
  319. return int(cursor.lastrowid)
  320. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  321. sql = """
  322. SELECT
  323. id,
  324. unique_key,
  325. source,
  326. title,
  327. article_title,
  328. article_body,
  329. demand_cache_run_id,
  330. decode_result_json,
  331. contribution_points_json,
  332. contribution_demand_match_json
  333. FROM hot_content_records
  334. WHERE contribution_points_json IS NOT NULL
  335. AND postprocess_status IN (%s, %s, %s)
  336. ORDER BY updated_at ASC, id ASC
  337. LIMIT %s
  338. """
  339. with self.conn.cursor() as cursor:
  340. cursor.execute(
  341. sql,
  342. (
  343. PostprocessStatus.PENDING,
  344. PostprocessStatus.DEMAND_MATCHED,
  345. PostprocessStatus.FAILED,
  346. limit,
  347. ),
  348. )
  349. rows = cursor.fetchall()
  350. return [
  351. {
  352. "id": int(row["id"]),
  353. "unique_key": str(row["unique_key"]),
  354. "source": str(row.get("source") or ""),
  355. "title": str(row.get("title") or ""),
  356. "article_title": row.get("article_title"),
  357. "article_body": row.get("article_body"),
  358. "demand_cache_run_id": row.get("demand_cache_run_id"),
  359. "decode_result_json": _json_loads(row.get("decode_result_json")),
  360. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  361. "contribution_demand_match_json": _json_loads(
  362. row.get("contribution_demand_match_json")
  363. ),
  364. }
  365. for row in rows
  366. ]
  367. def save_contribution_demand_match(
  368. self,
  369. *,
  370. record_id: int,
  371. demand_cache_run_id: int,
  372. match_json: dict[str, Any],
  373. ) -> None:
  374. sql = """
  375. UPDATE hot_content_records
  376. SET demand_cache_run_id=%s,
  377. contribution_demand_match_json=%s,
  378. postprocess_status=%s,
  379. postprocess_error_reason=NULL,
  380. updated_at=NOW()
  381. WHERE id=%s
  382. """
  383. with self.conn.cursor() as cursor:
  384. cursor.execute(
  385. sql,
  386. (
  387. demand_cache_run_id,
  388. _json_dumps(match_json),
  389. PostprocessStatus.DEMAND_MATCHED,
  390. record_id,
  391. ),
  392. )
  393. def save_wxindex_trend(
  394. self,
  395. *,
  396. record_id: int,
  397. trend_json: dict[str, Any],
  398. ) -> None:
  399. sql = """
  400. UPDATE hot_content_records
  401. SET wxindex_trend_json=%s,
  402. postprocess_status=%s,
  403. postprocess_error_reason=NULL,
  404. updated_at=NOW()
  405. WHERE id=%s
  406. """
  407. with self.conn.cursor() as cursor:
  408. cursor.execute(
  409. sql,
  410. (
  411. _json_dumps(trend_json),
  412. PostprocessStatus.WXINDEX_DONE,
  413. record_id,
  414. ),
  415. )
  416. def update_postprocess_status(
  417. self,
  418. *,
  419. record_id: int,
  420. status: int,
  421. error_message: str | None = None,
  422. ) -> None:
  423. sql = """
  424. UPDATE hot_content_records
  425. SET postprocess_status=%s,
  426. postprocess_error_reason=%s,
  427. updated_at=NOW()
  428. WHERE id=%s
  429. """
  430. with self.conn.cursor() as cursor:
  431. cursor.execute(sql, (status, error_message, record_id))
  432. def replace_demand_export_rows(
  433. self,
  434. *,
  435. record_id: int,
  436. source: str,
  437. hot_title: str,
  438. article_title: str,
  439. rows: list[dict[str, Any]],
  440. ) -> None:
  441. self._ensure_demand_export_table()
  442. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  443. insert_sql = """
  444. INSERT INTO hot_content_demand_exports (
  445. record_id,
  446. source,
  447. hot_title,
  448. article_title,
  449. item_type,
  450. item_text,
  451. point_category,
  452. matched_demand,
  453. contribution_score,
  454. wxindex_keyword,
  455. all_hot_keywords,
  456. wxindex_latest_score,
  457. wxindex_trend,
  458. created_at,
  459. updated_at
  460. )
  461. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  462. """
  463. with self.conn.cursor() as cursor:
  464. cursor.execute(delete_sql, (record_id,))
  465. insert_rows = [
  466. (
  467. record_id,
  468. source,
  469. hot_title,
  470. article_title,
  471. str(item.get("item_type") or ""),
  472. str(item.get("item_text") or ""),
  473. str(item.get("point_category") or ""),
  474. str(item.get("matched_demand") or ""),
  475. item.get("contribution_score"),
  476. str(item.get("wxindex_keyword") or ""),
  477. str(item.get("all_hot_keywords") or ""),
  478. float(item.get("wxindex_latest_score") or 0),
  479. str(item.get("wxindex_trend") or ""),
  480. )
  481. for item in rows
  482. if str(item.get("item_type") or "").strip()
  483. and str(item.get("item_text") or "").strip()
  484. ]
  485. if insert_rows:
  486. cursor.executemany(insert_sql, insert_rows)
  487. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  488. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  489. self._ensure_demand_export_table()
  490. today_start = datetime.now(SHANGHAI_TZ).replace(
  491. hour=0,
  492. minute=0,
  493. second=0,
  494. microsecond=0,
  495. tzinfo=None,
  496. )
  497. today_end = today_start + timedelta(days=1)
  498. sql = """
  499. SELECT
  500. e.record_id,
  501. e.item_type,
  502. e.item_text,
  503. e.point_category,
  504. e.matched_demand,
  505. e.wxindex_latest_score
  506. FROM hot_content_demand_exports e
  507. INNER JOIN hot_content_records r ON r.id = e.record_id
  508. WHERE r.created_at >= %s
  509. AND r.created_at < %s
  510. ORDER BY e.record_id ASC, e.id ASC
  511. """
  512. with self.conn.cursor() as cursor:
  513. cursor.execute(sql, (today_start, today_end))
  514. rows = cursor.fetchall()
  515. grouped: dict[int, list[dict[str, Any]]] = {}
  516. for row in rows:
  517. record_id = int(row["record_id"])
  518. grouped.setdefault(record_id, []).append(
  519. {
  520. "item_type": str(row.get("item_type") or ""),
  521. "item_text": str(row.get("item_text") or ""),
  522. "point_category": str(row.get("point_category") or ""),
  523. "matched_demand": str(row.get("matched_demand") or ""),
  524. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  525. }
  526. )
  527. return [
  528. {"record_id": record_id, "export_rows": export_rows}
  529. for record_id, export_rows in grouped.items()
  530. ]
  531. def _ensure_demand_export_table(self) -> None:
  532. sql = """
  533. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  534. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  535. record_id BIGINT UNSIGNED NOT NULL,
  536. source VARCHAR(64) NOT NULL DEFAULT '',
  537. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  538. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  539. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  540. item_text VARCHAR(1024) NOT NULL,
  541. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  542. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  543. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  544. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  545. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  546. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  547. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  548. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  549. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  550. PRIMARY KEY (id),
  551. KEY idx_record_id (record_id),
  552. KEY idx_source_type (source, item_type)
  553. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  554. """
  555. with self.conn.cursor() as cursor:
  556. cursor.execute(sql)
  557. self._ensure_demand_export_column(
  558. cursor,
  559. "matched_demand",
  560. """
  561. ALTER TABLE hot_content_demand_exports
  562. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  563. COMMENT '匹配到的需求'
  564. AFTER item_text
  565. """,
  566. )
  567. self._ensure_demand_export_column(
  568. cursor,
  569. "point_category",
  570. """
  571. ALTER TABLE hot_content_demand_exports
  572. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  573. COMMENT '点类型:灵感点/目的点/关键点'
  574. AFTER item_text
  575. """,
  576. )
  577. self._ensure_demand_export_column(
  578. cursor,
  579. "contribution_score",
  580. """
  581. ALTER TABLE hot_content_demand_exports
  582. ADD COLUMN contribution_score DOUBLE NULL
  583. COMMENT '贡献分,仅词有值'
  584. AFTER matched_demand
  585. """,
  586. )
  587. self._ensure_demand_export_column(
  588. cursor,
  589. "wxindex_trend",
  590. """
  591. ALTER TABLE hot_content_demand_exports
  592. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  593. COMMENT '微信指数趋势'
  594. AFTER wxindex_latest_score
  595. """,
  596. )
  597. self._ensure_demand_export_column(
  598. cursor,
  599. "wxindex_keyword",
  600. """
  601. ALTER TABLE hot_content_demand_exports
  602. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  603. COMMENT '获取微信指数的词'
  604. AFTER contribution_score
  605. """,
  606. )
  607. self._ensure_demand_export_column(
  608. cursor,
  609. "all_hot_keywords",
  610. """
  611. ALTER TABLE hot_content_demand_exports
  612. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  613. COMMENT '全部热点词'
  614. AFTER wxindex_keyword
  615. """,
  616. )
  617. def _ensure_demand_export_column(
  618. self,
  619. cursor: Any,
  620. column_name: str,
  621. alter_sql: str,
  622. ) -> None:
  623. cursor.execute(
  624. """
  625. SELECT COUNT(*) AS cnt
  626. FROM information_schema.COLUMNS
  627. WHERE TABLE_SCHEMA = DATABASE()
  628. AND TABLE_NAME = 'hot_content_demand_exports'
  629. AND COLUMN_NAME = %s
  630. """,
  631. (column_name,),
  632. )
  633. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  634. cursor.execute(alter_sql)
  635. def list_synced_odps_demand_ids(
  636. self,
  637. *,
  638. partition_dt: str,
  639. strategy: str,
  640. ) -> set[str]:
  641. self._ensure_odps_sync_log_table()
  642. sql = """
  643. SELECT demand_id
  644. FROM hot_content_odps_sync_log
  645. WHERE partition_dt = %s
  646. AND strategy = %s
  647. """
  648. with self.conn.cursor() as cursor:
  649. cursor.execute(sql, (partition_dt, strategy))
  650. rows = cursor.fetchall()
  651. return {
  652. str(row.get("demand_id") or "").strip()
  653. for row in rows
  654. if str(row.get("demand_id") or "").strip()
  655. }
  656. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  657. if not rows:
  658. return 0
  659. self._ensure_odps_sync_log_table()
  660. sql = """
  661. INSERT INTO hot_content_odps_sync_log (
  662. partition_dt,
  663. strategy,
  664. demand_id,
  665. demand_name,
  666. demand_type,
  667. record_id
  668. )
  669. VALUES (%s, %s, %s, %s, %s, %s)
  670. ON DUPLICATE KEY UPDATE
  671. demand_name = VALUES(demand_name),
  672. demand_type = VALUES(demand_type),
  673. record_id = VALUES(record_id),
  674. synced_at = CURRENT_TIMESTAMP
  675. """
  676. insert_rows = [
  677. (
  678. str(item.get("partition_dt") or ""),
  679. str(item.get("strategy") or ""),
  680. str(item.get("demand_id") or ""),
  681. str(item.get("demand_name") or ""),
  682. str(item.get("demand_type") or ""),
  683. int(item.get("record_id") or 0),
  684. )
  685. for item in rows
  686. if str(item.get("demand_id") or "").strip()
  687. ]
  688. with self.conn.cursor() as cursor:
  689. cursor.executemany(sql, insert_rows)
  690. return len(insert_rows)
  691. def _ensure_odps_sync_log_table(self) -> None:
  692. sql = """
  693. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  694. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  695. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  696. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  697. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  698. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  699. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  700. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  701. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  702. PRIMARY KEY (id),
  703. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  704. KEY idx_record_partition (record_id, partition_dt),
  705. KEY idx_partition_strategy (partition_dt, strategy)
  706. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  707. """
  708. with self.conn.cursor() as cursor:
  709. cursor.execute(sql)