repository.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _json_loads(value: Any) -> Any:
  20. if value is None:
  21. return None
  22. if isinstance(value, (dict, list)):
  23. return value
  24. if isinstance(value, (bytes, bytearray)):
  25. value = value.decode("utf-8")
  26. if isinstance(value, str):
  27. return json.loads(value)
  28. return value
  29. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  30. names: list[str] = []
  31. seen: set[str] = set()
  32. for item in demand_name_set:
  33. name = str(item).strip()
  34. if not name or name in seen:
  35. continue
  36. seen.add(name)
  37. names.append(name)
  38. return names
  39. def unique_title_key(source: str, title: str) -> str:
  40. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  41. class HotContentRepository:
  42. def __init__(self, config: MysqlConfig):
  43. if pymysql is None or DictCursor is None:
  44. raise HotContentFlowError("missing dependency: pip install pymysql")
  45. self.conn = pymysql.connect(
  46. host=config.host,
  47. port=config.port,
  48. user=config.user,
  49. password=config.password,
  50. database=config.database,
  51. charset=config.charset,
  52. autocommit=True,
  53. cursorclass=DictCursor,
  54. )
  55. def close(self) -> None:
  56. self.conn.close()
  57. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  58. key = unique_title_key(source, title)
  59. sql = """
  60. INSERT INTO hot_content_records (
  61. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  62. )
  63. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  64. ON DUPLICATE KEY UPDATE
  65. id=LAST_INSERT_ID(id),
  66. hot_rank=VALUES(hot_rank),
  67. updated_at=NOW()
  68. """
  69. with self.conn.cursor() as cursor:
  70. cursor.execute(
  71. sql,
  72. (
  73. key,
  74. source,
  75. title,
  76. rank,
  77. ExecutionStatus.HOT_SAVED,
  78. ),
  79. )
  80. record_id = int(cursor.lastrowid)
  81. cursor.execute(
  82. """
  83. SELECT
  84. id,
  85. unique_key,
  86. execution_status,
  87. article_title,
  88. article_body,
  89. article_url,
  90. decode_request_result IS NOT NULL AS has_decode_request,
  91. contribution_points_json IS NOT NULL AS has_contribution_points
  92. FROM hot_content_records
  93. WHERE id = %s
  94. """,
  95. (record_id,),
  96. )
  97. row = cursor.fetchone()
  98. if not row:
  99. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  100. return {
  101. "id": int(row["id"]),
  102. "unique_key": str(row["unique_key"]),
  103. "execution_status": int(row["execution_status"]),
  104. "article_title": row.get("article_title"),
  105. "article_body": row.get("article_body"),
  106. "article_url": row.get("article_url"),
  107. "has_decode_request": bool(row.get("has_decode_request")),
  108. "has_contribution_points": bool(row.get("has_contribution_points")),
  109. }
  110. def update_status(
  111. self,
  112. *,
  113. record_id: int,
  114. status: int,
  115. error_message: str | None = None,
  116. ) -> None:
  117. sql = """
  118. UPDATE hot_content_records
  119. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  120. WHERE id=%s
  121. """
  122. with self.conn.cursor() as cursor:
  123. cursor.execute(sql, (status, error_message, record_id))
  124. def update_article(
  125. self,
  126. *,
  127. record_id: int,
  128. article_title: str,
  129. article_body: str,
  130. url: str,
  131. ) -> None:
  132. sql = """
  133. UPDATE hot_content_records
  134. SET article_title=%s,
  135. article_body=%s,
  136. article_url=%s,
  137. execution_status=%s,
  138. error_reason=NULL,
  139. updated_at=NOW()
  140. WHERE id=%s
  141. """
  142. with self.conn.cursor() as cursor:
  143. cursor.execute(
  144. sql,
  145. (
  146. article_title,
  147. article_body,
  148. url,
  149. ExecutionStatus.CONTENT_OK,
  150. record_id,
  151. ),
  152. )
  153. def mark_no_valid_content(
  154. self,
  155. *,
  156. record_id: int,
  157. reason: str,
  158. ) -> None:
  159. """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
  160. sql = """
  161. UPDATE hot_content_records
  162. SET execution_status=%s,
  163. error_reason=%s,
  164. updated_at=NOW()
  165. WHERE id=%s
  166. """
  167. with self.conn.cursor() as cursor:
  168. cursor.execute(
  169. sql,
  170. (
  171. ExecutionStatus.NO_VALID_CONTENT,
  172. str(reason or "no valid content").strip(),
  173. record_id,
  174. ),
  175. )
  176. def update_category_filter_result(
  177. self,
  178. *,
  179. record_id: int,
  180. passed: bool,
  181. result_json: dict[str, Any],
  182. ) -> None:
  183. self._ensure_category_filter_columns()
  184. status = (
  185. ExecutionStatus.CATEGORY_FILTER_PASSED
  186. if passed
  187. else ExecutionStatus.CATEGORY_FILTER_REJECTED
  188. )
  189. reason = str(result_json.get("reason") or "").strip()
  190. error_message = None if passed else (reason or "category filter rejected")
  191. sql = """
  192. UPDATE hot_content_records
  193. SET execution_status=%s,
  194. category_filter_passed=%s,
  195. category_filter_reason=%s,
  196. category_filter_json=%s,
  197. error_reason=%s,
  198. updated_at=NOW()
  199. WHERE id=%s
  200. """
  201. with self.conn.cursor() as cursor:
  202. cursor.execute(
  203. sql,
  204. (
  205. status,
  206. 1 if passed else 0,
  207. reason or None,
  208. _json_dumps(result_json),
  209. error_message,
  210. record_id,
  211. ),
  212. )
  213. def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
  214. self._ensure_category_filter_columns()
  215. sql = """
  216. SELECT
  217. id,
  218. source,
  219. title,
  220. article_title,
  221. article_body,
  222. article_url,
  223. execution_status,
  224. category_filter_passed,
  225. category_filter_reason,
  226. category_filter_json
  227. FROM hot_content_records
  228. WHERE id = %s
  229. LIMIT 1
  230. """
  231. with self.conn.cursor() as cursor:
  232. cursor.execute(sql, (record_id,))
  233. row = cursor.fetchone()
  234. if not row:
  235. return None
  236. category_filter_json = _json_loads(row.get("category_filter_json"))
  237. passed_raw = row.get("category_filter_passed")
  238. passed: bool | None
  239. if passed_raw is None:
  240. passed = None
  241. else:
  242. passed = bool(int(passed_raw))
  243. return {
  244. "id": int(row["id"]),
  245. "source": str(row.get("source") or ""),
  246. "title": str(row.get("title") or ""),
  247. "article_title": str(row.get("article_title") or ""),
  248. "article_body": str(row.get("article_body") or ""),
  249. "article_url": str(row.get("article_url") or ""),
  250. "execution_status": int(row.get("execution_status") or 0),
  251. "category_filter_passed": passed,
  252. "category_filter_reason": str(row.get("category_filter_reason") or ""),
  253. "category_filter_json": (
  254. category_filter_json if isinstance(category_filter_json, dict) else {}
  255. ),
  256. }
  257. def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
  258. record = self.get_record_for_category_filter(record_id)
  259. if not record:
  260. return None
  261. matched_category = None
  262. category_filter_json = record.get("category_filter_json") or {}
  263. if isinstance(category_filter_json, dict):
  264. matched_category = category_filter_json.get("matched_category")
  265. return {
  266. "id": record["id"],
  267. "passed": record["category_filter_passed"],
  268. "reason": record["category_filter_reason"],
  269. "matched_category": matched_category,
  270. "execution_status": record["execution_status"],
  271. }
  272. def update_decode_result(
  273. self,
  274. *,
  275. record_id: int,
  276. status: int,
  277. request_json: dict[str, Any],
  278. response_json: dict[str, Any] | None,
  279. error_message: str | None = None,
  280. ) -> None:
  281. decode_request_result = {
  282. "request": request_json,
  283. "response": response_json,
  284. }
  285. sql = """
  286. UPDATE hot_content_records
  287. SET decode_request_result=%s,
  288. execution_status=%s,
  289. error_reason=%s,
  290. updated_at=NOW()
  291. WHERE id=%s
  292. """
  293. with self.conn.cursor() as cursor:
  294. cursor.execute(
  295. sql,
  296. (
  297. _json_dumps(decode_request_result),
  298. status,
  299. error_message,
  300. record_id,
  301. ),
  302. )
  303. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  304. sql = """
  305. SELECT id, unique_key
  306. FROM hot_content_records
  307. WHERE execution_status IN (%s, %s, %s)
  308. AND contribution_points_json IS NULL
  309. ORDER BY updated_at ASC, id ASC
  310. LIMIT %s
  311. """
  312. with self.conn.cursor() as cursor:
  313. cursor.execute(
  314. sql,
  315. (
  316. ExecutionStatus.DECODE_SUBMITTED,
  317. ExecutionStatus.DECODE_SUCCESS,
  318. ExecutionStatus.DECODE_PENDING,
  319. limit,
  320. ),
  321. )
  322. rows = cursor.fetchall()
  323. return [
  324. {
  325. "id": int(row["id"]),
  326. "unique_key": str(row["unique_key"]),
  327. }
  328. for row in rows
  329. ]
  330. def save_decode_result_export(
  331. self,
  332. *,
  333. record_id: int,
  334. decode_result_json: dict[str, Any],
  335. contribution_points_json: dict[str, Any],
  336. ) -> None:
  337. sql = """
  338. UPDATE hot_content_records
  339. SET decode_result_json=%s,
  340. contribution_points_json=%s,
  341. execution_status=%s,
  342. error_reason=NULL,
  343. updated_at=NOW()
  344. WHERE id=%s
  345. """
  346. with self.conn.cursor() as cursor:
  347. cursor.execute(
  348. sql,
  349. (
  350. _json_dumps(decode_result_json),
  351. _json_dumps(contribution_points_json),
  352. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  353. record_id,
  354. ),
  355. )
  356. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  357. sql = """
  358. SELECT
  359. id,
  360. cache_hour,
  361. source_table,
  362. partition_dt,
  363. demand_name_set_json,
  364. item_count,
  365. updated_at
  366. FROM demand_pool_hourly_cache
  367. WHERE cache_hour=%s
  368. LIMIT 1
  369. """
  370. with self.conn.cursor() as cursor:
  371. cursor.execute(sql, (cache_hour,))
  372. row = cursor.fetchone()
  373. if not row:
  374. return None
  375. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  376. if not isinstance(demand_name_set, list):
  377. demand_name_set = []
  378. return {
  379. "id": int(row["id"]),
  380. "cache_hour": row.get("cache_hour"),
  381. "source_table": str(row["source_table"]),
  382. "partition_dt": row.get("partition_dt"),
  383. "demand_name_set": [
  384. str(name).strip()
  385. for name in demand_name_set
  386. if str(name).strip()
  387. ],
  388. "item_count": int(row.get("item_count") or 0),
  389. "updated_at": row.get("updated_at"),
  390. }
  391. def save_demand_cache_set(
  392. self,
  393. *,
  394. cache_hour: datetime,
  395. source_table: str,
  396. partition_dt: str | None,
  397. excluded_strategy: str,
  398. top_n: int,
  399. demand_name_set: list[str],
  400. ) -> int:
  401. sql = """
  402. INSERT INTO demand_pool_hourly_cache (
  403. cache_hour,
  404. source_table,
  405. partition_dt,
  406. excluded_strategy,
  407. top_n,
  408. demand_name_set_json,
  409. item_count,
  410. created_at,
  411. updated_at
  412. )
  413. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  414. ON DUPLICATE KEY UPDATE
  415. id=LAST_INSERT_ID(id),
  416. source_table=VALUES(source_table),
  417. partition_dt=VALUES(partition_dt),
  418. excluded_strategy=VALUES(excluded_strategy),
  419. top_n=VALUES(top_n),
  420. demand_name_set_json=VALUES(demand_name_set_json),
  421. item_count=VALUES(item_count),
  422. updated_at=NOW()
  423. """
  424. normalized_names = _normalize_demand_names(demand_name_set)
  425. with self.conn.cursor() as cursor:
  426. cursor.execute(
  427. sql,
  428. (
  429. cache_hour,
  430. source_table,
  431. partition_dt,
  432. excluded_strategy,
  433. top_n,
  434. _json_dumps(normalized_names),
  435. len(normalized_names),
  436. ),
  437. )
  438. return int(cursor.lastrowid)
  439. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  440. sql = """
  441. SELECT
  442. id,
  443. unique_key,
  444. source,
  445. title,
  446. article_title,
  447. article_body,
  448. demand_cache_run_id,
  449. decode_result_json,
  450. contribution_points_json,
  451. contribution_demand_match_json
  452. FROM hot_content_records
  453. WHERE contribution_points_json IS NOT NULL
  454. AND postprocess_status IN (%s, %s, %s)
  455. ORDER BY updated_at ASC, id ASC
  456. LIMIT %s
  457. """
  458. with self.conn.cursor() as cursor:
  459. cursor.execute(
  460. sql,
  461. (
  462. PostprocessStatus.PENDING,
  463. PostprocessStatus.DEMAND_MATCHED,
  464. PostprocessStatus.FAILED,
  465. limit,
  466. ),
  467. )
  468. rows = cursor.fetchall()
  469. return [
  470. {
  471. "id": int(row["id"]),
  472. "unique_key": str(row["unique_key"]),
  473. "source": str(row.get("source") or ""),
  474. "title": str(row.get("title") or ""),
  475. "article_title": row.get("article_title"),
  476. "article_body": row.get("article_body"),
  477. "demand_cache_run_id": row.get("demand_cache_run_id"),
  478. "decode_result_json": _json_loads(row.get("decode_result_json")),
  479. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  480. "contribution_demand_match_json": _json_loads(
  481. row.get("contribution_demand_match_json")
  482. ),
  483. }
  484. for row in rows
  485. ]
  486. def save_contribution_demand_match(
  487. self,
  488. *,
  489. record_id: int,
  490. demand_cache_run_id: int,
  491. match_json: dict[str, Any],
  492. ) -> None:
  493. sql = """
  494. UPDATE hot_content_records
  495. SET demand_cache_run_id=%s,
  496. contribution_demand_match_json=%s,
  497. postprocess_status=%s,
  498. postprocess_error_reason=NULL,
  499. updated_at=NOW()
  500. WHERE id=%s
  501. """
  502. with self.conn.cursor() as cursor:
  503. cursor.execute(
  504. sql,
  505. (
  506. demand_cache_run_id,
  507. _json_dumps(match_json),
  508. PostprocessStatus.DEMAND_MATCHED,
  509. record_id,
  510. ),
  511. )
  512. def save_wxindex_trend(
  513. self,
  514. *,
  515. record_id: int,
  516. trend_json: dict[str, Any],
  517. ) -> None:
  518. sql = """
  519. UPDATE hot_content_records
  520. SET wxindex_trend_json=%s,
  521. postprocess_status=%s,
  522. postprocess_error_reason=NULL,
  523. updated_at=NOW()
  524. WHERE id=%s
  525. """
  526. with self.conn.cursor() as cursor:
  527. cursor.execute(
  528. sql,
  529. (
  530. _json_dumps(trend_json),
  531. PostprocessStatus.WXINDEX_DONE,
  532. record_id,
  533. ),
  534. )
  535. def save_demand_quality(
  536. self,
  537. *,
  538. record_id: int,
  539. event_sense_json: dict[str, Any],
  540. senior_fit_json: dict[str, Any],
  541. update_status: bool = True,
  542. ) -> None:
  543. self._ensure_record_quality_columns()
  544. if update_status:
  545. sql = """
  546. UPDATE hot_content_records
  547. SET demand_event_sense_json=%s,
  548. demand_senior_fit_json=%s,
  549. postprocess_status=%s,
  550. postprocess_error_reason=NULL,
  551. updated_at=NOW()
  552. WHERE id=%s
  553. """
  554. params = (
  555. _json_dumps(event_sense_json),
  556. _json_dumps(senior_fit_json),
  557. PostprocessStatus.QUALITY_DONE,
  558. record_id,
  559. )
  560. else:
  561. sql = """
  562. UPDATE hot_content_records
  563. SET demand_event_sense_json=%s,
  564. demand_senior_fit_json=%s,
  565. updated_at=NOW()
  566. WHERE id=%s
  567. """
  568. params = (
  569. _json_dumps(event_sense_json),
  570. _json_dumps(senior_fit_json),
  571. record_id,
  572. )
  573. with self.conn.cursor() as cursor:
  574. cursor.execute(sql, params)
  575. def update_postprocess_status(
  576. self,
  577. *,
  578. record_id: int,
  579. status: int,
  580. error_message: str | None = None,
  581. ) -> None:
  582. sql = """
  583. UPDATE hot_content_records
  584. SET postprocess_status=%s,
  585. postprocess_error_reason=%s,
  586. updated_at=NOW()
  587. WHERE id=%s
  588. """
  589. with self.conn.cursor() as cursor:
  590. cursor.execute(sql, (status, error_message, record_id))
  591. def replace_demand_export_rows(
  592. self,
  593. *,
  594. record_id: int,
  595. source: str,
  596. hot_title: str,
  597. article_title: str,
  598. rows: list[dict[str, Any]],
  599. ) -> None:
  600. self._ensure_demand_export_table()
  601. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  602. insert_sql = """
  603. INSERT INTO hot_content_demand_exports (
  604. record_id,
  605. source,
  606. hot_title,
  607. article_title,
  608. item_type,
  609. item_text,
  610. point_category,
  611. matched_demand,
  612. contribution_score,
  613. wxindex_keyword,
  614. all_hot_keywords,
  615. wxindex_latest_score,
  616. wxindex_trend,
  617. is_as_demand,
  618. event_sense_score,
  619. senior_fit_score,
  620. created_at,
  621. updated_at
  622. )
  623. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  624. """
  625. with self.conn.cursor() as cursor:
  626. cursor.execute(delete_sql, (record_id,))
  627. insert_rows = [
  628. (
  629. record_id,
  630. source,
  631. hot_title,
  632. article_title,
  633. str(item.get("item_type") or ""),
  634. str(item.get("item_text") or ""),
  635. str(item.get("point_category") or ""),
  636. str(item.get("matched_demand") or ""),
  637. item.get("contribution_score"),
  638. str(item.get("wxindex_keyword") or ""),
  639. str(item.get("all_hot_keywords") or ""),
  640. float(item.get("wxindex_latest_score") or 0),
  641. str(item.get("wxindex_trend") or ""),
  642. int(item.get("is_as_demand") or 0),
  643. item.get("event_sense_score"),
  644. item.get("senior_fit_score"),
  645. )
  646. for item in rows
  647. if str(item.get("item_type") or "").strip()
  648. and str(item.get("item_text") or "").strip()
  649. ]
  650. if insert_rows:
  651. cursor.executemany(insert_sql, insert_rows)
  652. def list_odps_sync_records(self) -> list[dict[str, Any]]:
  653. """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
  654. self._ensure_record_quality_columns()
  655. today_start = datetime.now(SHANGHAI_TZ).replace(
  656. hour=0,
  657. minute=0,
  658. second=0,
  659. microsecond=0,
  660. tzinfo=None,
  661. )
  662. today_end = today_start + timedelta(days=1)
  663. sql = """
  664. SELECT
  665. id,
  666. contribution_points_json,
  667. contribution_demand_match_json,
  668. wxindex_trend_json,
  669. demand_event_sense_json,
  670. demand_senior_fit_json
  671. FROM hot_content_records
  672. WHERE created_at >= %s
  673. AND created_at < %s
  674. AND postprocess_status = %s
  675. AND contribution_demand_match_json IS NOT NULL
  676. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  677. ORDER BY id ASC
  678. """
  679. with self.conn.cursor() as cursor:
  680. cursor.execute(
  681. sql,
  682. (today_start, today_end, PostprocessStatus.QUALITY_DONE),
  683. )
  684. rows = cursor.fetchall()
  685. records: list[dict[str, Any]] = []
  686. for row in rows:
  687. records.append(
  688. {
  689. "id": int(row["id"]),
  690. "contribution_points_json": _json_loads(
  691. row.get("contribution_points_json")
  692. ),
  693. "contribution_demand_match_json": _json_loads(
  694. row.get("contribution_demand_match_json")
  695. ),
  696. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  697. "demand_event_sense_json": _json_loads(
  698. row.get("demand_event_sense_json")
  699. ),
  700. "demand_senior_fit_json": _json_loads(
  701. row.get("demand_senior_fit_json")
  702. ),
  703. }
  704. )
  705. return records
  706. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  707. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  708. self._ensure_demand_export_table()
  709. today_start = datetime.now(SHANGHAI_TZ).replace(
  710. hour=0,
  711. minute=0,
  712. second=0,
  713. microsecond=0,
  714. tzinfo=None,
  715. )
  716. today_end = today_start + timedelta(days=1)
  717. sql = """
  718. SELECT
  719. e.record_id,
  720. e.item_type,
  721. e.item_text,
  722. e.point_category,
  723. e.matched_demand,
  724. e.wxindex_latest_score
  725. FROM hot_content_demand_exports e
  726. INNER JOIN hot_content_records r ON r.id = e.record_id
  727. WHERE r.created_at >= %s
  728. AND r.created_at < %s
  729. ORDER BY e.record_id ASC, e.id ASC
  730. """
  731. with self.conn.cursor() as cursor:
  732. cursor.execute(sql, (today_start, today_end))
  733. rows = cursor.fetchall()
  734. grouped: dict[int, list[dict[str, Any]]] = {}
  735. for row in rows:
  736. record_id = int(row["record_id"])
  737. grouped.setdefault(record_id, []).append(
  738. {
  739. "item_type": str(row.get("item_type") or ""),
  740. "item_text": str(row.get("item_text") or ""),
  741. "point_category": str(row.get("point_category") or ""),
  742. "matched_demand": str(row.get("matched_demand") or ""),
  743. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  744. }
  745. )
  746. return [
  747. {"record_id": record_id, "export_rows": export_rows}
  748. for record_id, export_rows in grouped.items()
  749. ]
  750. def _ensure_demand_export_table(self) -> None:
  751. sql = """
  752. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  753. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  754. record_id BIGINT UNSIGNED NOT NULL,
  755. source VARCHAR(64) NOT NULL DEFAULT '',
  756. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  757. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  758. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  759. item_text VARCHAR(1024) NOT NULL,
  760. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  761. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  762. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  763. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  764. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  765. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  766. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  767. is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
  768. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  769. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  770. PRIMARY KEY (id),
  771. KEY idx_record_id (record_id),
  772. KEY idx_source_type (source, item_type)
  773. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  774. """
  775. with self.conn.cursor() as cursor:
  776. cursor.execute(sql)
  777. self._ensure_demand_export_column(
  778. cursor,
  779. "matched_demand",
  780. """
  781. ALTER TABLE hot_content_demand_exports
  782. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  783. COMMENT '匹配到的需求'
  784. AFTER item_text
  785. """,
  786. )
  787. self._ensure_demand_export_column(
  788. cursor,
  789. "point_category",
  790. """
  791. ALTER TABLE hot_content_demand_exports
  792. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  793. COMMENT '点类型:灵感点/目的点/关键点'
  794. AFTER item_text
  795. """,
  796. )
  797. self._ensure_demand_export_column(
  798. cursor,
  799. "contribution_score",
  800. """
  801. ALTER TABLE hot_content_demand_exports
  802. ADD COLUMN contribution_score DOUBLE NULL
  803. COMMENT '贡献分,仅词有值'
  804. AFTER matched_demand
  805. """,
  806. )
  807. self._ensure_demand_export_column(
  808. cursor,
  809. "wxindex_trend",
  810. """
  811. ALTER TABLE hot_content_demand_exports
  812. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  813. COMMENT '微信指数趋势'
  814. AFTER wxindex_latest_score
  815. """,
  816. )
  817. self._ensure_demand_export_column(
  818. cursor,
  819. "wxindex_keyword",
  820. """
  821. ALTER TABLE hot_content_demand_exports
  822. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  823. COMMENT '获取微信指数的词'
  824. AFTER contribution_score
  825. """,
  826. )
  827. self._ensure_demand_export_column(
  828. cursor,
  829. "all_hot_keywords",
  830. """
  831. ALTER TABLE hot_content_demand_exports
  832. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  833. COMMENT '全部热点词'
  834. AFTER wxindex_keyword
  835. """,
  836. )
  837. self._ensure_demand_export_column(
  838. cursor,
  839. "is_as_demand",
  840. """
  841. ALTER TABLE hot_content_demand_exports
  842. ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
  843. COMMENT '是否作为需求:0否 1是'
  844. AFTER wxindex_trend
  845. """,
  846. )
  847. self._ensure_demand_export_column(
  848. cursor,
  849. "event_sense_score",
  850. """
  851. ALTER TABLE hot_content_demand_exports
  852. ADD COLUMN event_sense_score DOUBLE NULL
  853. COMMENT '事件性得分 0-10'
  854. AFTER is_as_demand
  855. """,
  856. )
  857. self._ensure_demand_export_column(
  858. cursor,
  859. "senior_fit_score",
  860. """
  861. ALTER TABLE hot_content_demand_exports
  862. ADD COLUMN senior_fit_score DOUBLE NULL
  863. COMMENT '老年性得分 0-10'
  864. AFTER event_sense_score
  865. """,
  866. )
  867. def _ensure_record_quality_columns(self) -> None:
  868. with self.conn.cursor() as cursor:
  869. for column_name, alter_sql in (
  870. (
  871. "demand_event_sense_json",
  872. """
  873. ALTER TABLE hot_content_records
  874. ADD COLUMN demand_event_sense_json JSON NULL
  875. COMMENT '需求事件性 LLM 评分结果'
  876. AFTER wxindex_trend_json
  877. """,
  878. ),
  879. (
  880. "demand_senior_fit_json",
  881. """
  882. ALTER TABLE hot_content_records
  883. ADD COLUMN demand_senior_fit_json JSON NULL
  884. COMMENT '需求老年性 LLM 评分结果'
  885. AFTER demand_event_sense_json
  886. """,
  887. ),
  888. ):
  889. cursor.execute(
  890. """
  891. SELECT COUNT(*) AS cnt
  892. FROM information_schema.COLUMNS
  893. WHERE TABLE_SCHEMA = DATABASE()
  894. AND TABLE_NAME = 'hot_content_records'
  895. AND COLUMN_NAME = %s
  896. """,
  897. (column_name,),
  898. )
  899. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  900. cursor.execute(alter_sql)
  901. def _ensure_category_filter_columns(self) -> None:
  902. with self.conn.cursor() as cursor:
  903. for column_name, alter_sql in (
  904. (
  905. "category_filter_json",
  906. """
  907. ALTER TABLE hot_content_records
  908. ADD COLUMN category_filter_json JSON NULL
  909. COMMENT '老年人兴趣分类筛选 LLM 结果'
  910. AFTER article_url
  911. """,
  912. ),
  913. (
  914. "category_filter_passed",
  915. """
  916. ALTER TABLE hot_content_records
  917. ADD COLUMN category_filter_passed TINYINT NULL
  918. COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
  919. AFTER category_filter_json
  920. """,
  921. ),
  922. (
  923. "category_filter_reason",
  924. """
  925. ALTER TABLE hot_content_records
  926. ADD COLUMN category_filter_reason TEXT NULL
  927. COMMENT '分类筛选原因'
  928. AFTER category_filter_passed
  929. """,
  930. ),
  931. ):
  932. cursor.execute(
  933. """
  934. SELECT COUNT(*) AS cnt
  935. FROM information_schema.COLUMNS
  936. WHERE TABLE_SCHEMA = DATABASE()
  937. AND TABLE_NAME = 'hot_content_records'
  938. AND COLUMN_NAME = %s
  939. """,
  940. (column_name,),
  941. )
  942. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  943. cursor.execute(alter_sql)
  944. def _ensure_demand_export_column(
  945. self,
  946. cursor: Any,
  947. column_name: str,
  948. alter_sql: str,
  949. ) -> None:
  950. cursor.execute(
  951. """
  952. SELECT COUNT(*) AS cnt
  953. FROM information_schema.COLUMNS
  954. WHERE TABLE_SCHEMA = DATABASE()
  955. AND TABLE_NAME = 'hot_content_demand_exports'
  956. AND COLUMN_NAME = %s
  957. """,
  958. (column_name,),
  959. )
  960. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  961. cursor.execute(alter_sql)
  962. def list_synced_odps_demand_ids(
  963. self,
  964. *,
  965. partition_dt: str,
  966. strategy: str,
  967. ) -> set[str]:
  968. self._ensure_odps_sync_log_table()
  969. sql = """
  970. SELECT demand_id
  971. FROM hot_content_odps_sync_log
  972. WHERE partition_dt = %s
  973. AND strategy = %s
  974. """
  975. with self.conn.cursor() as cursor:
  976. cursor.execute(sql, (partition_dt, strategy))
  977. rows = cursor.fetchall()
  978. return {
  979. str(row.get("demand_id") or "").strip()
  980. for row in rows
  981. if str(row.get("demand_id") or "").strip()
  982. }
  983. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  984. if not rows:
  985. return 0
  986. self._ensure_odps_sync_log_table()
  987. sql = """
  988. INSERT INTO hot_content_odps_sync_log (
  989. partition_dt,
  990. strategy,
  991. demand_id,
  992. demand_name,
  993. demand_type,
  994. record_id,
  995. weight
  996. )
  997. VALUES (%s, %s, %s, %s, %s, %s, %s)
  998. ON DUPLICATE KEY UPDATE
  999. demand_name = VALUES(demand_name),
  1000. demand_type = VALUES(demand_type),
  1001. record_id = VALUES(record_id),
  1002. weight = VALUES(weight),
  1003. synced_at = CURRENT_TIMESTAMP
  1004. """
  1005. insert_rows = [
  1006. (
  1007. str(item.get("partition_dt") or ""),
  1008. str(item.get("strategy") or ""),
  1009. str(item.get("demand_id") or ""),
  1010. str(item.get("demand_name") or ""),
  1011. str(item.get("demand_type") or ""),
  1012. int(item.get("record_id") or 0),
  1013. float(item["weight"]) if item.get("weight") is not None else None,
  1014. )
  1015. for item in rows
  1016. if str(item.get("demand_id") or "").strip()
  1017. ]
  1018. with self.conn.cursor() as cursor:
  1019. cursor.executemany(sql, insert_rows)
  1020. return len(insert_rows)
  1021. def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
  1022. cursor.execute(
  1023. """
  1024. SELECT COUNT(*) AS cnt
  1025. FROM information_schema.COLUMNS
  1026. WHERE TABLE_SCHEMA = DATABASE()
  1027. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1028. AND COLUMN_NAME = 'weight'
  1029. """,
  1030. )
  1031. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1032. cursor.execute(
  1033. """
  1034. ALTER TABLE hot_content_odps_sync_log
  1035. ADD COLUMN weight DOUBLE NULL DEFAULT NULL
  1036. COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
  1037. AFTER record_id
  1038. """
  1039. )
  1040. def _ensure_odps_sync_log_table(self) -> None:
  1041. sql = """
  1042. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  1043. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1044. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  1045. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  1046. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  1047. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  1048. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  1049. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  1050. weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
  1051. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  1052. PRIMARY KEY (id),
  1053. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  1054. KEY idx_record_partition (record_id, partition_dt),
  1055. KEY idx_partition_strategy (partition_dt, strategy)
  1056. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1057. """
  1058. with self.conn.cursor() as cursor:
  1059. cursor.execute(sql)
  1060. self._ensure_odps_sync_log_weight_column(cursor)