repository.py 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _json_loads(value: Any) -> Any:
  20. if value is None:
  21. return None
  22. if isinstance(value, (dict, list)):
  23. return value
  24. if isinstance(value, (bytes, bytearray)):
  25. value = value.decode("utf-8")
  26. if isinstance(value, str):
  27. return json.loads(value)
  28. return value
  29. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  30. names: list[str] = []
  31. seen: set[str] = set()
  32. for item in demand_name_set:
  33. name = str(item).strip()
  34. if not name or name in seen:
  35. continue
  36. seen.add(name)
  37. names.append(name)
  38. return names
  39. def unique_title_key(source: str, title: str) -> str:
  40. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  41. class HotContentRepository:
  42. def __init__(self, config: MysqlConfig):
  43. if pymysql is None or DictCursor is None:
  44. raise HotContentFlowError("missing dependency: pip install pymysql")
  45. self.conn = pymysql.connect(
  46. host=config.host,
  47. port=config.port,
  48. user=config.user,
  49. password=config.password,
  50. database=config.database,
  51. charset=config.charset,
  52. autocommit=True,
  53. cursorclass=DictCursor,
  54. )
  55. def close(self) -> None:
  56. self.conn.close()
  57. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  58. key = unique_title_key(source, title)
  59. sql = """
  60. INSERT INTO hot_content_records (
  61. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  62. )
  63. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  64. ON DUPLICATE KEY UPDATE
  65. id=LAST_INSERT_ID(id),
  66. hot_rank=VALUES(hot_rank),
  67. updated_at=NOW()
  68. """
  69. with self.conn.cursor() as cursor:
  70. cursor.execute(
  71. sql,
  72. (
  73. key,
  74. source,
  75. title,
  76. rank,
  77. ExecutionStatus.HOT_SAVED,
  78. ),
  79. )
  80. record_id = int(cursor.lastrowid)
  81. cursor.execute(
  82. """
  83. SELECT
  84. id,
  85. unique_key,
  86. execution_status,
  87. article_title,
  88. article_body,
  89. article_url,
  90. decode_request_result IS NOT NULL AS has_decode_request,
  91. contribution_points_json IS NOT NULL AS has_contribution_points
  92. FROM hot_content_records
  93. WHERE id = %s
  94. """,
  95. (record_id,),
  96. )
  97. row = cursor.fetchone()
  98. if not row:
  99. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  100. return {
  101. "id": int(row["id"]),
  102. "unique_key": str(row["unique_key"]),
  103. "execution_status": int(row["execution_status"]),
  104. "article_title": row.get("article_title"),
  105. "article_body": row.get("article_body"),
  106. "article_url": row.get("article_url"),
  107. "has_decode_request": bool(row.get("has_decode_request")),
  108. "has_contribution_points": bool(row.get("has_contribution_points")),
  109. }
  110. def update_status(
  111. self,
  112. *,
  113. record_id: int,
  114. status: int,
  115. error_message: str | None = None,
  116. ) -> None:
  117. sql = """
  118. UPDATE hot_content_records
  119. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  120. WHERE id=%s
  121. """
  122. with self.conn.cursor() as cursor:
  123. cursor.execute(sql, (status, error_message, record_id))
  124. def update_article(
  125. self,
  126. *,
  127. record_id: int,
  128. article_title: str,
  129. article_body: str,
  130. url: str,
  131. ) -> None:
  132. sql = """
  133. UPDATE hot_content_records
  134. SET article_title=%s,
  135. article_body=%s,
  136. article_url=%s,
  137. execution_status=%s,
  138. error_reason=NULL,
  139. updated_at=NOW()
  140. WHERE id=%s
  141. """
  142. with self.conn.cursor() as cursor:
  143. cursor.execute(
  144. sql,
  145. (
  146. article_title,
  147. article_body,
  148. url,
  149. ExecutionStatus.CONTENT_OK,
  150. record_id,
  151. ),
  152. )
  153. def mark_no_valid_content(
  154. self,
  155. *,
  156. record_id: int,
  157. reason: str,
  158. ) -> None:
  159. """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
  160. sql = """
  161. UPDATE hot_content_records
  162. SET execution_status=%s,
  163. error_reason=%s,
  164. updated_at=NOW()
  165. WHERE id=%s
  166. """
  167. with self.conn.cursor() as cursor:
  168. cursor.execute(
  169. sql,
  170. (
  171. ExecutionStatus.NO_VALID_CONTENT,
  172. str(reason or "no valid content").strip(),
  173. record_id,
  174. ),
  175. )
  176. def update_category_filter_result(
  177. self,
  178. *,
  179. record_id: int,
  180. passed: bool,
  181. result_json: dict[str, Any],
  182. ) -> None:
  183. self._ensure_category_filter_columns()
  184. status = (
  185. ExecutionStatus.CATEGORY_FILTER_PASSED
  186. if passed
  187. else ExecutionStatus.CATEGORY_FILTER_REJECTED
  188. )
  189. reason = str(result_json.get("reason") or "").strip()
  190. error_message = None if passed else (reason or "category filter rejected")
  191. sql = """
  192. UPDATE hot_content_records
  193. SET execution_status=%s,
  194. category_filter_passed=%s,
  195. category_filter_reason=%s,
  196. category_filter_json=%s,
  197. error_reason=%s,
  198. updated_at=NOW()
  199. WHERE id=%s
  200. """
  201. with self.conn.cursor() as cursor:
  202. cursor.execute(
  203. sql,
  204. (
  205. status,
  206. 1 if passed else 0,
  207. reason or None,
  208. _json_dumps(result_json),
  209. error_message,
  210. record_id,
  211. ),
  212. )
  213. def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
  214. self._ensure_category_filter_columns()
  215. sql = """
  216. SELECT
  217. id,
  218. source,
  219. title,
  220. article_title,
  221. article_body,
  222. article_url,
  223. execution_status,
  224. category_filter_passed,
  225. category_filter_reason,
  226. category_filter_json
  227. FROM hot_content_records
  228. WHERE id = %s
  229. LIMIT 1
  230. """
  231. with self.conn.cursor() as cursor:
  232. cursor.execute(sql, (record_id,))
  233. row = cursor.fetchone()
  234. if not row:
  235. return None
  236. category_filter_json = _json_loads(row.get("category_filter_json"))
  237. passed_raw = row.get("category_filter_passed")
  238. passed: bool | None
  239. if passed_raw is None:
  240. passed = None
  241. else:
  242. passed = bool(int(passed_raw))
  243. return {
  244. "id": int(row["id"]),
  245. "source": str(row.get("source") or ""),
  246. "title": str(row.get("title") or ""),
  247. "article_title": str(row.get("article_title") or ""),
  248. "article_body": str(row.get("article_body") or ""),
  249. "article_url": str(row.get("article_url") or ""),
  250. "execution_status": int(row.get("execution_status") or 0),
  251. "category_filter_passed": passed,
  252. "category_filter_reason": str(row.get("category_filter_reason") or ""),
  253. "category_filter_json": (
  254. category_filter_json if isinstance(category_filter_json, dict) else {}
  255. ),
  256. }
  257. def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
  258. record = self.get_record_for_category_filter(record_id)
  259. if not record:
  260. return None
  261. matched_category = None
  262. category_filter_json = record.get("category_filter_json") or {}
  263. if isinstance(category_filter_json, dict):
  264. matched_category = category_filter_json.get("matched_category")
  265. return {
  266. "id": record["id"],
  267. "passed": record["category_filter_passed"],
  268. "reason": record["category_filter_reason"],
  269. "matched_category": matched_category,
  270. "execution_status": record["execution_status"],
  271. }
  272. def update_decode_result(
  273. self,
  274. *,
  275. record_id: int,
  276. status: int,
  277. request_json: dict[str, Any],
  278. response_json: dict[str, Any] | None,
  279. error_message: str | None = None,
  280. ) -> None:
  281. decode_request_result = {
  282. "request": request_json,
  283. "response": response_json,
  284. }
  285. sql = """
  286. UPDATE hot_content_records
  287. SET decode_request_result=%s,
  288. execution_status=%s,
  289. error_reason=%s,
  290. updated_at=NOW()
  291. WHERE id=%s
  292. """
  293. with self.conn.cursor() as cursor:
  294. cursor.execute(
  295. sql,
  296. (
  297. _json_dumps(decode_request_result),
  298. status,
  299. error_message,
  300. record_id,
  301. ),
  302. )
  303. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  304. sql = """
  305. SELECT id, unique_key
  306. FROM hot_content_records
  307. WHERE execution_status IN (%s, %s, %s)
  308. AND contribution_points_json IS NULL
  309. ORDER BY updated_at ASC, id ASC
  310. LIMIT %s
  311. """
  312. with self.conn.cursor() as cursor:
  313. cursor.execute(
  314. sql,
  315. (
  316. ExecutionStatus.DECODE_SUBMITTED,
  317. ExecutionStatus.DECODE_SUCCESS,
  318. ExecutionStatus.DECODE_PENDING,
  319. limit,
  320. ),
  321. )
  322. rows = cursor.fetchall()
  323. return [
  324. {
  325. "id": int(row["id"]),
  326. "unique_key": str(row["unique_key"]),
  327. }
  328. for row in rows
  329. ]
  330. def save_decode_result_export(
  331. self,
  332. *,
  333. record_id: int,
  334. decode_result_json: dict[str, Any],
  335. contribution_points_json: dict[str, Any],
  336. ) -> None:
  337. sql = """
  338. UPDATE hot_content_records
  339. SET decode_result_json=%s,
  340. contribution_points_json=%s,
  341. execution_status=%s,
  342. error_reason=NULL,
  343. updated_at=NOW()
  344. WHERE id=%s
  345. """
  346. with self.conn.cursor() as cursor:
  347. cursor.execute(
  348. sql,
  349. (
  350. _json_dumps(decode_result_json),
  351. _json_dumps(contribution_points_json),
  352. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  353. record_id,
  354. ),
  355. )
  356. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  357. sql = """
  358. SELECT
  359. id,
  360. cache_hour,
  361. source_table,
  362. partition_dt,
  363. demand_name_set_json,
  364. item_count,
  365. updated_at
  366. FROM demand_pool_hourly_cache
  367. WHERE cache_hour=%s
  368. LIMIT 1
  369. """
  370. with self.conn.cursor() as cursor:
  371. cursor.execute(sql, (cache_hour,))
  372. row = cursor.fetchone()
  373. if not row:
  374. return None
  375. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  376. if not isinstance(demand_name_set, list):
  377. demand_name_set = []
  378. return {
  379. "id": int(row["id"]),
  380. "cache_hour": row.get("cache_hour"),
  381. "source_table": str(row["source_table"]),
  382. "partition_dt": row.get("partition_dt"),
  383. "demand_name_set": [
  384. str(name).strip()
  385. for name in demand_name_set
  386. if str(name).strip()
  387. ],
  388. "item_count": int(row.get("item_count") or 0),
  389. "updated_at": row.get("updated_at"),
  390. }
  391. def save_demand_cache_set(
  392. self,
  393. *,
  394. cache_hour: datetime,
  395. source_table: str,
  396. partition_dt: str | None,
  397. excluded_strategy: str,
  398. top_n: int,
  399. demand_name_set: list[str],
  400. ) -> int:
  401. sql = """
  402. INSERT INTO demand_pool_hourly_cache (
  403. cache_hour,
  404. source_table,
  405. partition_dt,
  406. excluded_strategy,
  407. top_n,
  408. demand_name_set_json,
  409. item_count,
  410. created_at,
  411. updated_at
  412. )
  413. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  414. ON DUPLICATE KEY UPDATE
  415. id=LAST_INSERT_ID(id),
  416. source_table=VALUES(source_table),
  417. partition_dt=VALUES(partition_dt),
  418. excluded_strategy=VALUES(excluded_strategy),
  419. top_n=VALUES(top_n),
  420. demand_name_set_json=VALUES(demand_name_set_json),
  421. item_count=VALUES(item_count),
  422. updated_at=NOW()
  423. """
  424. normalized_names = _normalize_demand_names(demand_name_set)
  425. with self.conn.cursor() as cursor:
  426. cursor.execute(
  427. sql,
  428. (
  429. cache_hour,
  430. source_table,
  431. partition_dt,
  432. excluded_strategy,
  433. top_n,
  434. _json_dumps(normalized_names),
  435. len(normalized_names),
  436. ),
  437. )
  438. return int(cursor.lastrowid)
  439. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  440. sql = """
  441. SELECT
  442. id,
  443. unique_key,
  444. source,
  445. title,
  446. article_title,
  447. article_body,
  448. demand_cache_run_id,
  449. decode_result_json,
  450. contribution_points_json,
  451. contribution_demand_match_json
  452. FROM hot_content_records
  453. WHERE contribution_points_json IS NOT NULL
  454. AND postprocess_status IN (%s, %s, %s)
  455. ORDER BY updated_at ASC, id ASC
  456. LIMIT %s
  457. """
  458. with self.conn.cursor() as cursor:
  459. cursor.execute(
  460. sql,
  461. (
  462. PostprocessStatus.PENDING,
  463. PostprocessStatus.DEMAND_MATCHED,
  464. PostprocessStatus.FAILED,
  465. limit,
  466. ),
  467. )
  468. rows = cursor.fetchall()
  469. return [
  470. {
  471. "id": int(row["id"]),
  472. "unique_key": str(row["unique_key"]),
  473. "source": str(row.get("source") or ""),
  474. "title": str(row.get("title") or ""),
  475. "article_title": row.get("article_title"),
  476. "article_body": row.get("article_body"),
  477. "demand_cache_run_id": row.get("demand_cache_run_id"),
  478. "decode_result_json": _json_loads(row.get("decode_result_json")),
  479. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  480. "contribution_demand_match_json": _json_loads(
  481. row.get("contribution_demand_match_json")
  482. ),
  483. }
  484. for row in rows
  485. ]
  486. def save_contribution_demand_match(
  487. self,
  488. *,
  489. record_id: int,
  490. demand_cache_run_id: int,
  491. match_json: dict[str, Any],
  492. ) -> None:
  493. sql = """
  494. UPDATE hot_content_records
  495. SET demand_cache_run_id=%s,
  496. contribution_demand_match_json=%s,
  497. postprocess_status=%s,
  498. postprocess_error_reason=NULL,
  499. updated_at=NOW()
  500. WHERE id=%s
  501. """
  502. with self.conn.cursor() as cursor:
  503. cursor.execute(
  504. sql,
  505. (
  506. demand_cache_run_id,
  507. _json_dumps(match_json),
  508. PostprocessStatus.DEMAND_MATCHED,
  509. record_id,
  510. ),
  511. )
  512. def save_wxindex_trend(
  513. self,
  514. *,
  515. record_id: int,
  516. trend_json: dict[str, Any],
  517. ) -> None:
  518. sql = """
  519. UPDATE hot_content_records
  520. SET wxindex_trend_json=%s,
  521. postprocess_status=%s,
  522. postprocess_error_reason=NULL,
  523. updated_at=NOW()
  524. WHERE id=%s
  525. """
  526. with self.conn.cursor() as cursor:
  527. cursor.execute(
  528. sql,
  529. (
  530. _json_dumps(trend_json),
  531. PostprocessStatus.WXINDEX_DONE,
  532. record_id,
  533. ),
  534. )
  535. def save_demand_quality(
  536. self,
  537. *,
  538. record_id: int,
  539. event_sense_json: dict[str, Any],
  540. senior_fit_json: dict[str, Any],
  541. update_status: bool = True,
  542. ) -> None:
  543. self._ensure_record_quality_columns()
  544. if update_status:
  545. sql = """
  546. UPDATE hot_content_records
  547. SET demand_event_sense_json=%s,
  548. demand_senior_fit_json=%s,
  549. postprocess_status=%s,
  550. postprocess_error_reason=NULL,
  551. updated_at=NOW()
  552. WHERE id=%s
  553. """
  554. params = (
  555. _json_dumps(event_sense_json),
  556. _json_dumps(senior_fit_json),
  557. PostprocessStatus.QUALITY_DONE,
  558. record_id,
  559. )
  560. else:
  561. sql = """
  562. UPDATE hot_content_records
  563. SET demand_event_sense_json=%s,
  564. demand_senior_fit_json=%s,
  565. updated_at=NOW()
  566. WHERE id=%s
  567. """
  568. params = (
  569. _json_dumps(event_sense_json),
  570. _json_dumps(senior_fit_json),
  571. record_id,
  572. )
  573. with self.conn.cursor() as cursor:
  574. cursor.execute(sql, params)
  575. def update_postprocess_status(
  576. self,
  577. *,
  578. record_id: int,
  579. status: int,
  580. error_message: str | None = None,
  581. ) -> None:
  582. sql = """
  583. UPDATE hot_content_records
  584. SET postprocess_status=%s,
  585. postprocess_error_reason=%s,
  586. updated_at=NOW()
  587. WHERE id=%s
  588. """
  589. with self.conn.cursor() as cursor:
  590. cursor.execute(sql, (status, error_message, record_id))
  591. def replace_demand_export_rows(
  592. self,
  593. *,
  594. record_id: int,
  595. source: str,
  596. hot_title: str,
  597. article_title: str,
  598. rows: list[dict[str, Any]],
  599. ) -> None:
  600. self._ensure_demand_export_table()
  601. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  602. insert_sql = """
  603. INSERT INTO hot_content_demand_exports (
  604. record_id,
  605. source,
  606. hot_title,
  607. article_title,
  608. item_type,
  609. item_text,
  610. point_category,
  611. matched_demand,
  612. contribution_score,
  613. wxindex_keyword,
  614. all_hot_keywords,
  615. wxindex_latest_score,
  616. wxindex_trend,
  617. is_as_demand,
  618. event_sense_score,
  619. senior_fit_score,
  620. created_at,
  621. updated_at
  622. )
  623. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  624. """
  625. with self.conn.cursor() as cursor:
  626. cursor.execute(delete_sql, (record_id,))
  627. insert_rows = [
  628. (
  629. record_id,
  630. source,
  631. hot_title,
  632. article_title,
  633. str(item.get("item_type") or ""),
  634. str(item.get("item_text") or ""),
  635. str(item.get("point_category") or ""),
  636. str(item.get("matched_demand") or ""),
  637. item.get("contribution_score"),
  638. str(item.get("wxindex_keyword") or ""),
  639. str(item.get("all_hot_keywords") or ""),
  640. float(item.get("wxindex_latest_score") or 0),
  641. str(item.get("wxindex_trend") or ""),
  642. int(item.get("is_as_demand") or 0),
  643. item.get("event_sense_score"),
  644. item.get("senior_fit_score"),
  645. )
  646. for item in rows
  647. if str(item.get("item_type") or "").strip()
  648. and str(item.get("item_text") or "").strip()
  649. ]
  650. if insert_rows:
  651. cursor.executemany(insert_sql, insert_rows)
  652. def list_odps_sync_records(self) -> list[dict[str, Any]]:
  653. """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
  654. self._ensure_record_quality_columns()
  655. today_start = datetime.now(SHANGHAI_TZ).replace(
  656. hour=0,
  657. minute=0,
  658. second=0,
  659. microsecond=0,
  660. tzinfo=None,
  661. )
  662. today_end = today_start + timedelta(days=1)
  663. sql = """
  664. SELECT
  665. id,
  666. contribution_points_json,
  667. contribution_demand_match_json,
  668. wxindex_trend_json,
  669. demand_event_sense_json,
  670. demand_senior_fit_json
  671. FROM hot_content_records
  672. WHERE created_at >= %s
  673. AND created_at < %s
  674. AND postprocess_status = %s
  675. AND contribution_demand_match_json IS NOT NULL
  676. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  677. ORDER BY id ASC
  678. """
  679. with self.conn.cursor() as cursor:
  680. cursor.execute(
  681. sql,
  682. (today_start, today_end, PostprocessStatus.QUALITY_DONE),
  683. )
  684. rows = cursor.fetchall()
  685. records: list[dict[str, Any]] = []
  686. for row in rows:
  687. records.append(
  688. {
  689. "id": int(row["id"]),
  690. "contribution_points_json": _json_loads(
  691. row.get("contribution_points_json")
  692. ),
  693. "contribution_demand_match_json": _json_loads(
  694. row.get("contribution_demand_match_json")
  695. ),
  696. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  697. "demand_event_sense_json": _json_loads(
  698. row.get("demand_event_sense_json")
  699. ),
  700. "demand_senior_fit_json": _json_loads(
  701. row.get("demand_senior_fit_json")
  702. ),
  703. }
  704. )
  705. return records
  706. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  707. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  708. self._ensure_demand_export_table()
  709. today_start = datetime.now(SHANGHAI_TZ).replace(
  710. hour=0,
  711. minute=0,
  712. second=0,
  713. microsecond=0,
  714. tzinfo=None,
  715. )
  716. today_end = today_start + timedelta(days=1)
  717. sql = """
  718. SELECT
  719. e.record_id,
  720. e.item_type,
  721. e.item_text,
  722. e.point_category,
  723. e.matched_demand,
  724. e.wxindex_latest_score
  725. FROM hot_content_demand_exports e
  726. INNER JOIN hot_content_records r ON r.id = e.record_id
  727. WHERE r.created_at >= %s
  728. AND r.created_at < %s
  729. ORDER BY e.record_id ASC, e.id ASC
  730. """
  731. with self.conn.cursor() as cursor:
  732. cursor.execute(sql, (today_start, today_end))
  733. rows = cursor.fetchall()
  734. grouped: dict[int, list[dict[str, Any]]] = {}
  735. for row in rows:
  736. record_id = int(row["record_id"])
  737. grouped.setdefault(record_id, []).append(
  738. {
  739. "item_type": str(row.get("item_type") or ""),
  740. "item_text": str(row.get("item_text") or ""),
  741. "point_category": str(row.get("point_category") or ""),
  742. "matched_demand": str(row.get("matched_demand") or ""),
  743. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  744. }
  745. )
  746. return [
  747. {"record_id": record_id, "export_rows": export_rows}
  748. for record_id, export_rows in grouped.items()
  749. ]
  750. def _ensure_demand_export_table(self) -> None:
  751. sql = """
  752. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  753. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  754. record_id BIGINT UNSIGNED NOT NULL,
  755. source VARCHAR(64) NOT NULL DEFAULT '',
  756. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  757. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  758. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  759. item_text VARCHAR(1024) NOT NULL,
  760. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  761. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  762. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  763. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  764. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  765. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  766. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  767. is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
  768. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  769. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  770. PRIMARY KEY (id),
  771. KEY idx_record_id (record_id),
  772. KEY idx_source_type (source, item_type)
  773. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  774. """
  775. with self.conn.cursor() as cursor:
  776. cursor.execute(sql)
  777. self._ensure_demand_export_column(
  778. cursor,
  779. "matched_demand",
  780. """
  781. ALTER TABLE hot_content_demand_exports
  782. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  783. COMMENT '匹配到的需求'
  784. AFTER item_text
  785. """,
  786. )
  787. self._ensure_demand_export_column(
  788. cursor,
  789. "point_category",
  790. """
  791. ALTER TABLE hot_content_demand_exports
  792. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  793. COMMENT '点类型:灵感点/目的点/关键点'
  794. AFTER item_text
  795. """,
  796. )
  797. self._ensure_demand_export_column(
  798. cursor,
  799. "contribution_score",
  800. """
  801. ALTER TABLE hot_content_demand_exports
  802. ADD COLUMN contribution_score DOUBLE NULL
  803. COMMENT '贡献分,仅词有值'
  804. AFTER matched_demand
  805. """,
  806. )
  807. self._ensure_demand_export_column(
  808. cursor,
  809. "wxindex_trend",
  810. """
  811. ALTER TABLE hot_content_demand_exports
  812. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  813. COMMENT '微信指数趋势'
  814. AFTER wxindex_latest_score
  815. """,
  816. )
  817. self._ensure_demand_export_column(
  818. cursor,
  819. "wxindex_keyword",
  820. """
  821. ALTER TABLE hot_content_demand_exports
  822. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  823. COMMENT '获取微信指数的词'
  824. AFTER contribution_score
  825. """,
  826. )
  827. self._ensure_demand_export_column(
  828. cursor,
  829. "all_hot_keywords",
  830. """
  831. ALTER TABLE hot_content_demand_exports
  832. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  833. COMMENT '全部热点词'
  834. AFTER wxindex_keyword
  835. """,
  836. )
  837. self._ensure_demand_export_column(
  838. cursor,
  839. "is_as_demand",
  840. """
  841. ALTER TABLE hot_content_demand_exports
  842. ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
  843. COMMENT '是否作为需求:0否 1是'
  844. AFTER wxindex_trend
  845. """,
  846. )
  847. self._ensure_demand_export_column(
  848. cursor,
  849. "event_sense_score",
  850. """
  851. ALTER TABLE hot_content_demand_exports
  852. ADD COLUMN event_sense_score DOUBLE NULL
  853. COMMENT '事件性得分 0-10'
  854. AFTER is_as_demand
  855. """,
  856. )
  857. self._ensure_demand_export_column(
  858. cursor,
  859. "senior_fit_score",
  860. """
  861. ALTER TABLE hot_content_demand_exports
  862. ADD COLUMN senior_fit_score DOUBLE NULL
  863. COMMENT '老年性得分 0-10'
  864. AFTER event_sense_score
  865. """,
  866. )
  867. def _ensure_record_quality_columns(self) -> None:
  868. with self.conn.cursor() as cursor:
  869. for column_name, alter_sql in (
  870. (
  871. "demand_event_sense_json",
  872. """
  873. ALTER TABLE hot_content_records
  874. ADD COLUMN demand_event_sense_json JSON NULL
  875. COMMENT '需求事件性 LLM 评分结果'
  876. AFTER wxindex_trend_json
  877. """,
  878. ),
  879. (
  880. "demand_senior_fit_json",
  881. """
  882. ALTER TABLE hot_content_records
  883. ADD COLUMN demand_senior_fit_json JSON NULL
  884. COMMENT '需求老年性 LLM 评分结果'
  885. AFTER demand_event_sense_json
  886. """,
  887. ),
  888. ):
  889. cursor.execute(
  890. """
  891. SELECT COUNT(*) AS cnt
  892. FROM information_schema.COLUMNS
  893. WHERE TABLE_SCHEMA = DATABASE()
  894. AND TABLE_NAME = 'hot_content_records'
  895. AND COLUMN_NAME = %s
  896. """,
  897. (column_name,),
  898. )
  899. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  900. cursor.execute(alter_sql)
  901. def _ensure_category_filter_columns(self) -> None:
  902. with self.conn.cursor() as cursor:
  903. for column_name, alter_sql in (
  904. (
  905. "category_filter_json",
  906. """
  907. ALTER TABLE hot_content_records
  908. ADD COLUMN category_filter_json JSON NULL
  909. COMMENT '老年人兴趣分类筛选 LLM 结果'
  910. AFTER article_url
  911. """,
  912. ),
  913. (
  914. "category_filter_passed",
  915. """
  916. ALTER TABLE hot_content_records
  917. ADD COLUMN category_filter_passed TINYINT NULL
  918. COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
  919. AFTER category_filter_json
  920. """,
  921. ),
  922. (
  923. "category_filter_reason",
  924. """
  925. ALTER TABLE hot_content_records
  926. ADD COLUMN category_filter_reason TEXT NULL
  927. COMMENT '分类筛选原因'
  928. AFTER category_filter_passed
  929. """,
  930. ),
  931. ):
  932. cursor.execute(
  933. """
  934. SELECT COUNT(*) AS cnt
  935. FROM information_schema.COLUMNS
  936. WHERE TABLE_SCHEMA = DATABASE()
  937. AND TABLE_NAME = 'hot_content_records'
  938. AND COLUMN_NAME = %s
  939. """,
  940. (column_name,),
  941. )
  942. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  943. cursor.execute(alter_sql)
  944. def _ensure_demand_export_column(
  945. self,
  946. cursor: Any,
  947. column_name: str,
  948. alter_sql: str,
  949. ) -> None:
  950. cursor.execute(
  951. """
  952. SELECT COUNT(*) AS cnt
  953. FROM information_schema.COLUMNS
  954. WHERE TABLE_SCHEMA = DATABASE()
  955. AND TABLE_NAME = 'hot_content_demand_exports'
  956. AND COLUMN_NAME = %s
  957. """,
  958. (column_name,),
  959. )
  960. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  961. cursor.execute(alter_sql)
  962. def list_synced_odps_demand_ids(
  963. self,
  964. *,
  965. partition_dt: str,
  966. strategy: str,
  967. ) -> set[str]:
  968. self._ensure_odps_sync_log_table()
  969. sql = """
  970. SELECT demand_id
  971. FROM hot_content_odps_sync_log
  972. WHERE partition_dt = %s
  973. AND strategy = %s
  974. """
  975. with self.conn.cursor() as cursor:
  976. cursor.execute(sql, (partition_dt, strategy))
  977. rows = cursor.fetchall()
  978. return {
  979. str(row.get("demand_id") or "").strip()
  980. for row in rows
  981. if str(row.get("demand_id") or "").strip()
  982. }
  983. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  984. if not rows:
  985. return 0
  986. self._ensure_odps_sync_log_table()
  987. sql = """
  988. INSERT INTO hot_content_odps_sync_log (
  989. partition_dt,
  990. strategy,
  991. demand_id,
  992. demand_name,
  993. demand_type,
  994. record_id,
  995. weight
  996. )
  997. VALUES (%s, %s, %s, %s, %s, %s, %s)
  998. ON DUPLICATE KEY UPDATE
  999. demand_name = VALUES(demand_name),
  1000. demand_type = VALUES(demand_type),
  1001. record_id = VALUES(record_id),
  1002. weight = VALUES(weight),
  1003. synced_at = CURRENT_TIMESTAMP
  1004. """
  1005. insert_rows = [
  1006. (
  1007. str(item.get("partition_dt") or ""),
  1008. str(item.get("strategy") or ""),
  1009. str(item.get("demand_id") or ""),
  1010. str(item.get("demand_name") or ""),
  1011. str(item.get("demand_type") or ""),
  1012. int(item.get("record_id") or 0),
  1013. float(item["weight"]) if item.get("weight") is not None else None,
  1014. )
  1015. for item in rows
  1016. if str(item.get("demand_id") or "").strip()
  1017. ]
  1018. with self.conn.cursor() as cursor:
  1019. cursor.executemany(sql, insert_rows)
  1020. return len(insert_rows)
  1021. def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
  1022. cursor.execute(
  1023. """
  1024. SELECT COUNT(*) AS cnt
  1025. FROM information_schema.COLUMNS
  1026. WHERE TABLE_SCHEMA = DATABASE()
  1027. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1028. AND COLUMN_NAME = 'weight'
  1029. """,
  1030. )
  1031. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1032. cursor.execute(
  1033. """
  1034. ALTER TABLE hot_content_odps_sync_log
  1035. ADD COLUMN weight DOUBLE NULL DEFAULT NULL
  1036. COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
  1037. AFTER record_id
  1038. """
  1039. )
  1040. def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
  1041. word = str(name or "").strip()
  1042. if not word:
  1043. return []
  1044. self._ensure_wxindex_words_table()
  1045. sql = """
  1046. SELECT dt, total_score
  1047. FROM hot_content_wxindex_words
  1048. WHERE name = %s
  1049. ORDER BY dt ASC
  1050. """
  1051. with self.conn.cursor() as cursor:
  1052. cursor.execute(sql, (word,))
  1053. rows = cursor.fetchall()
  1054. scores: list[dict[str, Any]] = []
  1055. for row in rows:
  1056. dt = str(row.get("dt") or "").strip()
  1057. if not dt:
  1058. continue
  1059. try:
  1060. total_score = float(row["total_score"])
  1061. except (TypeError, ValueError, KeyError):
  1062. continue
  1063. scores.append({"ymd": dt, "total_score": total_score})
  1064. return scores
  1065. def list_stale_wxindex_words(
  1066. self,
  1067. *,
  1068. end_ymd: str,
  1069. start_ymd: str = "20260601",
  1070. ) -> list[dict[str, Any]]:
  1071. """返回已存在但缺最新日期,或未从 start_ymd 补齐的词。"""
  1072. target_end = str(end_ymd or "").strip()
  1073. target_start = str(start_ymd or "").strip()
  1074. if not target_end or not target_start:
  1075. return []
  1076. self._ensure_wxindex_words_table()
  1077. sql = """
  1078. SELECT name, MIN(dt) AS earliest_dt, MAX(dt) AS latest_dt
  1079. FROM hot_content_wxindex_words
  1080. GROUP BY name
  1081. HAVING MAX(dt) < %s OR MIN(dt) > %s
  1082. ORDER BY name ASC
  1083. """
  1084. with self.conn.cursor() as cursor:
  1085. cursor.execute(sql, (target_end, target_start))
  1086. rows = cursor.fetchall()
  1087. stale_words: list[dict[str, Any]] = []
  1088. for row in rows:
  1089. name = str(row.get("name") or "").strip()
  1090. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1091. latest_dt = str(row.get("latest_dt") or "").strip()
  1092. if name and earliest_dt and latest_dt:
  1093. stale_words.append(
  1094. {
  1095. "name": name,
  1096. "earliest_dt": earliest_dt,
  1097. "latest_dt": latest_dt,
  1098. }
  1099. )
  1100. return stale_words
  1101. def list_low_max_wxindex_words(
  1102. self,
  1103. *,
  1104. min_max_score: float,
  1105. ) -> list[dict[str, Any]]:
  1106. """按 name 聚合,返回最大值低于阈值的词。"""
  1107. self._ensure_wxindex_words_table()
  1108. sql = """
  1109. SELECT
  1110. name,
  1111. MAX(total_score) AS max_score,
  1112. COUNT(*) AS row_count
  1113. FROM hot_content_wxindex_words
  1114. GROUP BY name
  1115. HAVING MAX(total_score) < %s
  1116. ORDER BY name ASC
  1117. """
  1118. with self.conn.cursor() as cursor:
  1119. cursor.execute(sql, (min_max_score,))
  1120. rows = cursor.fetchall()
  1121. low_words: list[dict[str, Any]] = []
  1122. for row in rows:
  1123. name = str(row.get("name") or "").strip()
  1124. if not name:
  1125. continue
  1126. try:
  1127. max_score = float(row["max_score"])
  1128. row_count = int(row["row_count"])
  1129. except (TypeError, ValueError, KeyError):
  1130. continue
  1131. low_words.append(
  1132. {
  1133. "name": name,
  1134. "max_score": max_score,
  1135. "row_count": row_count,
  1136. }
  1137. )
  1138. return low_words
  1139. def delete_wxindex_words_by_names(self, names: list[str]) -> int:
  1140. cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
  1141. if not cleaned:
  1142. return 0
  1143. self._ensure_wxindex_words_table()
  1144. placeholders = ", ".join(["%s"] * len(cleaned))
  1145. sql = f"""
  1146. DELETE FROM hot_content_wxindex_words
  1147. WHERE name IN ({placeholders})
  1148. """
  1149. with self.conn.cursor() as cursor:
  1150. cursor.execute(sql, tuple(cleaned))
  1151. return int(cursor.rowcount or 0)
  1152. def get_wxindex_word_latest_dt(self, name: str) -> str | None:
  1153. word = str(name or "").strip()
  1154. if not word:
  1155. return None
  1156. self._ensure_wxindex_words_table()
  1157. sql = """
  1158. SELECT MAX(dt) AS latest_dt
  1159. FROM hot_content_wxindex_words
  1160. WHERE name = %s
  1161. """
  1162. with self.conn.cursor() as cursor:
  1163. cursor.execute(sql, (word,))
  1164. row = cursor.fetchone() or {}
  1165. latest_dt = str(row.get("latest_dt") or "").strip()
  1166. return latest_dt or None
  1167. def save_wxindex_daily_scores(
  1168. self,
  1169. *,
  1170. name: str,
  1171. scores: list[dict[str, Any]],
  1172. ) -> tuple[int, int]:
  1173. """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
  1174. word = str(name or "").strip()
  1175. if not word or not scores:
  1176. return 0, 0
  1177. self._ensure_wxindex_words_table()
  1178. sql = """
  1179. INSERT IGNORE INTO hot_content_wxindex_words (
  1180. name,
  1181. dt,
  1182. total_score
  1183. )
  1184. VALUES (%s, %s, %s)
  1185. """
  1186. rows: list[tuple[str, str, float]] = []
  1187. seen: set[tuple[str, str]] = set()
  1188. for item in scores:
  1189. if not isinstance(item, dict):
  1190. continue
  1191. dt = str(item.get("ymd") or item.get("dt") or "").strip()
  1192. if not dt:
  1193. continue
  1194. try:
  1195. total_score = float(item["total_score"])
  1196. except (TypeError, ValueError, KeyError):
  1197. continue
  1198. key = (word, dt)
  1199. if key in seen:
  1200. continue
  1201. seen.add(key)
  1202. rows.append((word, dt, total_score))
  1203. if not rows:
  1204. return 0, 0
  1205. inserted = 0
  1206. with self.conn.cursor() as cursor:
  1207. for row in rows:
  1208. cursor.execute(sql, row)
  1209. inserted += int(cursor.rowcount or 0)
  1210. skipped = len(rows) - inserted
  1211. return inserted, skipped
  1212. def list_records_with_wxindex_trend(
  1213. self,
  1214. *,
  1215. since_dt: datetime,
  1216. ) -> list[dict[str, Any]]:
  1217. sql = """
  1218. SELECT id, wxindex_trend_json
  1219. FROM hot_content_records
  1220. WHERE created_at >= %s
  1221. AND wxindex_trend_json IS NOT NULL
  1222. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  1223. ORDER BY id ASC
  1224. """
  1225. with self.conn.cursor() as cursor:
  1226. cursor.execute(sql, (since_dt,))
  1227. rows = cursor.fetchall()
  1228. records: list[dict[str, Any]] = []
  1229. for row in rows:
  1230. records.append(
  1231. {
  1232. "id": int(row["id"]),
  1233. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  1234. }
  1235. )
  1236. return records
  1237. def _ensure_wxindex_words_table(self) -> None:
  1238. sql = """
  1239. CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
  1240. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1241. name VARCHAR(256) NOT NULL COMMENT '词',
  1242. dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
  1243. total_score DOUBLE NOT NULL COMMENT '微信指数',
  1244. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  1245. PRIMARY KEY (id),
  1246. UNIQUE KEY uk_name_dt (name, dt),
  1247. KEY idx_name (name),
  1248. KEY idx_dt (dt)
  1249. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1250. """
  1251. with self.conn.cursor() as cursor:
  1252. cursor.execute(sql)
  1253. def _ensure_odps_sync_log_table(self) -> None:
  1254. sql = """
  1255. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  1256. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1257. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  1258. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  1259. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  1260. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  1261. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  1262. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  1263. weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
  1264. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  1265. PRIMARY KEY (id),
  1266. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  1267. KEY idx_record_partition (record_id, partition_dt),
  1268. KEY idx_partition_strategy (partition_dt, strategy)
  1269. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1270. """
  1271. with self.conn.cursor() as cursor:
  1272. cursor.execute(sql)
  1273. self._ensure_odps_sync_log_weight_column(cursor)