repository.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import datetime
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.types import MysqlConfig
  16. def _json_dumps(data: Any) -> str:
  17. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  18. def _json_loads(value: Any) -> Any:
  19. if value is None:
  20. return None
  21. if isinstance(value, (dict, list)):
  22. return value
  23. if isinstance(value, (bytes, bytearray)):
  24. value = value.decode("utf-8")
  25. if isinstance(value, str):
  26. return json.loads(value)
  27. return value
  28. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  29. names: list[str] = []
  30. seen: set[str] = set()
  31. for item in demand_name_set:
  32. name = str(item).strip()
  33. if not name or name in seen:
  34. continue
  35. seen.add(name)
  36. names.append(name)
  37. return names
  38. def unique_title_key(source: str, title: str) -> str:
  39. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  40. class HotContentRepository:
  41. def __init__(self, config: MysqlConfig):
  42. if pymysql is None or DictCursor is None:
  43. raise HotContentFlowError("missing dependency: pip install pymysql")
  44. self.conn = pymysql.connect(
  45. host=config.host,
  46. port=config.port,
  47. user=config.user,
  48. password=config.password,
  49. database=config.database,
  50. charset=config.charset,
  51. autocommit=True,
  52. cursorclass=DictCursor,
  53. )
  54. def close(self) -> None:
  55. self.conn.close()
  56. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  57. key = unique_title_key(source, title)
  58. sql = """
  59. INSERT INTO hot_content_records (
  60. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  61. )
  62. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  63. ON DUPLICATE KEY UPDATE
  64. id=LAST_INSERT_ID(id),
  65. hot_rank=VALUES(hot_rank),
  66. updated_at=NOW()
  67. """
  68. with self.conn.cursor() as cursor:
  69. cursor.execute(
  70. sql,
  71. (
  72. key,
  73. source,
  74. title,
  75. rank,
  76. ExecutionStatus.HOT_SAVED,
  77. ),
  78. )
  79. record_id = int(cursor.lastrowid)
  80. cursor.execute(
  81. """
  82. SELECT
  83. id,
  84. unique_key,
  85. execution_status,
  86. article_title,
  87. article_body,
  88. article_url,
  89. decode_request_result IS NOT NULL AS has_decode_request,
  90. contribution_points_json IS NOT NULL AS has_contribution_points
  91. FROM hot_content_records
  92. WHERE id = %s
  93. """,
  94. (record_id,),
  95. )
  96. row = cursor.fetchone()
  97. if not row:
  98. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  99. return {
  100. "id": int(row["id"]),
  101. "unique_key": str(row["unique_key"]),
  102. "execution_status": int(row["execution_status"]),
  103. "article_title": row.get("article_title"),
  104. "article_body": row.get("article_body"),
  105. "article_url": row.get("article_url"),
  106. "has_decode_request": bool(row.get("has_decode_request")),
  107. "has_contribution_points": bool(row.get("has_contribution_points")),
  108. }
  109. def update_status(
  110. self,
  111. *,
  112. record_id: int,
  113. status: int,
  114. error_message: str | None = None,
  115. ) -> None:
  116. sql = """
  117. UPDATE hot_content_records
  118. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  119. WHERE id=%s
  120. """
  121. with self.conn.cursor() as cursor:
  122. cursor.execute(sql, (status, error_message, record_id))
  123. def update_article(
  124. self,
  125. *,
  126. record_id: int,
  127. article_title: str,
  128. article_body: str,
  129. url: str,
  130. ) -> None:
  131. sql = """
  132. UPDATE hot_content_records
  133. SET article_title=%s,
  134. article_body=%s,
  135. article_url=%s,
  136. execution_status=%s,
  137. error_reason=NULL,
  138. updated_at=NOW()
  139. WHERE id=%s
  140. """
  141. with self.conn.cursor() as cursor:
  142. cursor.execute(
  143. sql,
  144. (
  145. article_title,
  146. article_body,
  147. url,
  148. ExecutionStatus.CONTENT_OK,
  149. record_id,
  150. ),
  151. )
  152. def update_decode_result(
  153. self,
  154. *,
  155. record_id: int,
  156. status: int,
  157. request_json: dict[str, Any],
  158. response_json: dict[str, Any] | None,
  159. error_message: str | None = None,
  160. ) -> None:
  161. decode_request_result = {
  162. "request": request_json,
  163. "response": response_json,
  164. }
  165. sql = """
  166. UPDATE hot_content_records
  167. SET decode_request_result=%s,
  168. execution_status=%s,
  169. error_reason=%s,
  170. updated_at=NOW()
  171. WHERE id=%s
  172. """
  173. with self.conn.cursor() as cursor:
  174. cursor.execute(
  175. sql,
  176. (
  177. _json_dumps(decode_request_result),
  178. status,
  179. error_message,
  180. record_id,
  181. ),
  182. )
  183. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  184. sql = """
  185. SELECT id, unique_key
  186. FROM hot_content_records
  187. WHERE execution_status IN (%s, %s, %s)
  188. AND contribution_points_json IS NULL
  189. ORDER BY updated_at ASC, id ASC
  190. LIMIT %s
  191. """
  192. with self.conn.cursor() as cursor:
  193. cursor.execute(
  194. sql,
  195. (
  196. ExecutionStatus.DECODE_SUBMITTED,
  197. ExecutionStatus.DECODE_SUCCESS,
  198. ExecutionStatus.DECODE_PENDING,
  199. limit,
  200. ),
  201. )
  202. rows = cursor.fetchall()
  203. return [
  204. {
  205. "id": int(row["id"]),
  206. "unique_key": str(row["unique_key"]),
  207. }
  208. for row in rows
  209. ]
  210. def save_decode_result_export(
  211. self,
  212. *,
  213. record_id: int,
  214. decode_result_json: dict[str, Any],
  215. contribution_points_json: dict[str, Any],
  216. ) -> None:
  217. sql = """
  218. UPDATE hot_content_records
  219. SET decode_result_json=%s,
  220. contribution_points_json=%s,
  221. execution_status=%s,
  222. error_reason=NULL,
  223. updated_at=NOW()
  224. WHERE id=%s
  225. """
  226. with self.conn.cursor() as cursor:
  227. cursor.execute(
  228. sql,
  229. (
  230. _json_dumps(decode_result_json),
  231. _json_dumps(contribution_points_json),
  232. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  233. record_id,
  234. ),
  235. )
  236. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  237. sql = """
  238. SELECT
  239. id,
  240. cache_hour,
  241. source_table,
  242. partition_dt,
  243. demand_name_set_json,
  244. item_count,
  245. updated_at
  246. FROM demand_pool_hourly_cache
  247. WHERE cache_hour=%s
  248. LIMIT 1
  249. """
  250. with self.conn.cursor() as cursor:
  251. cursor.execute(sql, (cache_hour,))
  252. row = cursor.fetchone()
  253. if not row:
  254. return None
  255. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  256. if not isinstance(demand_name_set, list):
  257. demand_name_set = []
  258. return {
  259. "id": int(row["id"]),
  260. "cache_hour": row.get("cache_hour"),
  261. "source_table": str(row["source_table"]),
  262. "partition_dt": row.get("partition_dt"),
  263. "demand_name_set": [
  264. str(name).strip()
  265. for name in demand_name_set
  266. if str(name).strip()
  267. ],
  268. "item_count": int(row.get("item_count") or 0),
  269. "updated_at": row.get("updated_at"),
  270. }
  271. def save_demand_cache_set(
  272. self,
  273. *,
  274. cache_hour: datetime,
  275. source_table: str,
  276. partition_dt: str | None,
  277. excluded_strategy: str,
  278. top_n: int,
  279. demand_name_set: list[str],
  280. ) -> int:
  281. sql = """
  282. INSERT INTO demand_pool_hourly_cache (
  283. cache_hour,
  284. source_table,
  285. partition_dt,
  286. excluded_strategy,
  287. top_n,
  288. demand_name_set_json,
  289. item_count,
  290. created_at,
  291. updated_at
  292. )
  293. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  294. ON DUPLICATE KEY UPDATE
  295. id=LAST_INSERT_ID(id),
  296. source_table=VALUES(source_table),
  297. partition_dt=VALUES(partition_dt),
  298. excluded_strategy=VALUES(excluded_strategy),
  299. top_n=VALUES(top_n),
  300. demand_name_set_json=VALUES(demand_name_set_json),
  301. item_count=VALUES(item_count),
  302. updated_at=NOW()
  303. """
  304. normalized_names = _normalize_demand_names(demand_name_set)
  305. with self.conn.cursor() as cursor:
  306. cursor.execute(
  307. sql,
  308. (
  309. cache_hour,
  310. source_table,
  311. partition_dt,
  312. excluded_strategy,
  313. top_n,
  314. _json_dumps(normalized_names),
  315. len(normalized_names),
  316. ),
  317. )
  318. return int(cursor.lastrowid)
  319. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  320. sql = """
  321. SELECT
  322. id,
  323. unique_key,
  324. source,
  325. title,
  326. article_title,
  327. article_body,
  328. demand_cache_run_id,
  329. decode_result_json,
  330. contribution_points_json,
  331. contribution_demand_match_json
  332. FROM hot_content_records
  333. WHERE contribution_points_json IS NOT NULL
  334. AND postprocess_status IN (%s, %s, %s)
  335. ORDER BY updated_at ASC, id ASC
  336. LIMIT %s
  337. """
  338. with self.conn.cursor() as cursor:
  339. cursor.execute(
  340. sql,
  341. (
  342. PostprocessStatus.PENDING,
  343. PostprocessStatus.DEMAND_MATCHED,
  344. PostprocessStatus.FAILED,
  345. limit,
  346. ),
  347. )
  348. rows = cursor.fetchall()
  349. return [
  350. {
  351. "id": int(row["id"]),
  352. "unique_key": str(row["unique_key"]),
  353. "source": str(row.get("source") or ""),
  354. "title": str(row.get("title") or ""),
  355. "article_title": row.get("article_title"),
  356. "article_body": row.get("article_body"),
  357. "demand_cache_run_id": row.get("demand_cache_run_id"),
  358. "decode_result_json": _json_loads(row.get("decode_result_json")),
  359. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  360. "contribution_demand_match_json": _json_loads(
  361. row.get("contribution_demand_match_json")
  362. ),
  363. }
  364. for row in rows
  365. ]
  366. def save_contribution_demand_match(
  367. self,
  368. *,
  369. record_id: int,
  370. demand_cache_run_id: int,
  371. match_json: dict[str, Any],
  372. ) -> None:
  373. sql = """
  374. UPDATE hot_content_records
  375. SET demand_cache_run_id=%s,
  376. contribution_demand_match_json=%s,
  377. postprocess_status=%s,
  378. postprocess_error_reason=NULL,
  379. updated_at=NOW()
  380. WHERE id=%s
  381. """
  382. with self.conn.cursor() as cursor:
  383. cursor.execute(
  384. sql,
  385. (
  386. demand_cache_run_id,
  387. _json_dumps(match_json),
  388. PostprocessStatus.DEMAND_MATCHED,
  389. record_id,
  390. ),
  391. )
  392. def save_wxindex_trend(
  393. self,
  394. *,
  395. record_id: int,
  396. trend_json: dict[str, Any],
  397. ) -> None:
  398. sql = """
  399. UPDATE hot_content_records
  400. SET wxindex_trend_json=%s,
  401. postprocess_status=%s,
  402. postprocess_error_reason=NULL,
  403. updated_at=NOW()
  404. WHERE id=%s
  405. """
  406. with self.conn.cursor() as cursor:
  407. cursor.execute(
  408. sql,
  409. (
  410. _json_dumps(trend_json),
  411. PostprocessStatus.WXINDEX_DONE,
  412. record_id,
  413. ),
  414. )
  415. def update_postprocess_status(
  416. self,
  417. *,
  418. record_id: int,
  419. status: int,
  420. error_message: str | None = None,
  421. ) -> None:
  422. sql = """
  423. UPDATE hot_content_records
  424. SET postprocess_status=%s,
  425. postprocess_error_reason=%s,
  426. updated_at=NOW()
  427. WHERE id=%s
  428. """
  429. with self.conn.cursor() as cursor:
  430. cursor.execute(sql, (status, error_message, record_id))
  431. def replace_demand_export_rows(
  432. self,
  433. *,
  434. record_id: int,
  435. source: str,
  436. hot_title: str,
  437. article_title: str,
  438. rows: list[dict[str, Any]],
  439. ) -> None:
  440. self._ensure_demand_export_table()
  441. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  442. insert_sql = """
  443. INSERT INTO hot_content_demand_exports (
  444. record_id,
  445. source,
  446. hot_title,
  447. article_title,
  448. item_type,
  449. item_text,
  450. point_category,
  451. matched_demand,
  452. contribution_score,
  453. wxindex_keyword,
  454. wxindex_latest_score,
  455. wxindex_trend,
  456. created_at,
  457. updated_at
  458. )
  459. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  460. """
  461. with self.conn.cursor() as cursor:
  462. cursor.execute(delete_sql, (record_id,))
  463. insert_rows = [
  464. (
  465. record_id,
  466. source,
  467. hot_title,
  468. article_title,
  469. str(item.get("item_type") or ""),
  470. str(item.get("item_text") or ""),
  471. str(item.get("point_category") or ""),
  472. str(item.get("matched_demand") or ""),
  473. item.get("contribution_score"),
  474. str(item.get("wxindex_keyword") or ""),
  475. float(item.get("wxindex_latest_score") or 0),
  476. str(item.get("wxindex_trend") or ""),
  477. )
  478. for item in rows
  479. if str(item.get("item_type") or "").strip()
  480. and str(item.get("item_text") or "").strip()
  481. ]
  482. if insert_rows:
  483. cursor.executemany(insert_sql, insert_rows)
  484. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  485. self._ensure_demand_export_table()
  486. sql = """
  487. SELECT
  488. record_id,
  489. item_type,
  490. item_text,
  491. point_category,
  492. matched_demand,
  493. wxindex_latest_score
  494. FROM hot_content_demand_exports
  495. ORDER BY record_id ASC, id ASC
  496. """
  497. with self.conn.cursor() as cursor:
  498. cursor.execute(sql)
  499. rows = cursor.fetchall()
  500. grouped: dict[int, list[dict[str, Any]]] = {}
  501. for row in rows:
  502. record_id = int(row["record_id"])
  503. grouped.setdefault(record_id, []).append(
  504. {
  505. "item_type": str(row.get("item_type") or ""),
  506. "item_text": str(row.get("item_text") or ""),
  507. "point_category": str(row.get("point_category") or ""),
  508. "matched_demand": str(row.get("matched_demand") or ""),
  509. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  510. }
  511. )
  512. return [
  513. {"record_id": record_id, "export_rows": export_rows}
  514. for record_id, export_rows in grouped.items()
  515. ]
  516. def _ensure_demand_export_table(self) -> None:
  517. sql = """
  518. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  519. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  520. record_id BIGINT UNSIGNED NOT NULL,
  521. source VARCHAR(64) NOT NULL DEFAULT '',
  522. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  523. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  524. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  525. item_text VARCHAR(1024) NOT NULL,
  526. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  527. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  528. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  529. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  530. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  531. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  532. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  533. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  534. PRIMARY KEY (id),
  535. KEY idx_record_id (record_id),
  536. KEY idx_source_type (source, item_type)
  537. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  538. """
  539. with self.conn.cursor() as cursor:
  540. cursor.execute(sql)
  541. self._ensure_demand_export_column(
  542. cursor,
  543. "matched_demand",
  544. """
  545. ALTER TABLE hot_content_demand_exports
  546. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  547. COMMENT '匹配到的需求'
  548. AFTER item_text
  549. """,
  550. )
  551. self._ensure_demand_export_column(
  552. cursor,
  553. "point_category",
  554. """
  555. ALTER TABLE hot_content_demand_exports
  556. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  557. COMMENT '点类型:灵感点/目的点/关键点'
  558. AFTER item_text
  559. """,
  560. )
  561. self._ensure_demand_export_column(
  562. cursor,
  563. "contribution_score",
  564. """
  565. ALTER TABLE hot_content_demand_exports
  566. ADD COLUMN contribution_score DOUBLE NULL
  567. COMMENT '贡献分,仅词有值'
  568. AFTER matched_demand
  569. """,
  570. )
  571. self._ensure_demand_export_column(
  572. cursor,
  573. "wxindex_trend",
  574. """
  575. ALTER TABLE hot_content_demand_exports
  576. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  577. COMMENT '微信指数趋势'
  578. AFTER wxindex_latest_score
  579. """,
  580. )
  581. self._ensure_demand_export_column(
  582. cursor,
  583. "wxindex_keyword",
  584. """
  585. ALTER TABLE hot_content_demand_exports
  586. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  587. COMMENT '获取微信指数的词'
  588. AFTER contribution_score
  589. """,
  590. )
  591. def _ensure_demand_export_column(
  592. self,
  593. cursor: Any,
  594. column_name: str,
  595. alter_sql: str,
  596. ) -> None:
  597. cursor.execute(
  598. """
  599. SELECT COUNT(*) AS cnt
  600. FROM information_schema.COLUMNS
  601. WHERE TABLE_SCHEMA = DATABASE()
  602. AND TABLE_NAME = 'hot_content_demand_exports'
  603. AND COLUMN_NAME = %s
  604. """,
  605. (column_name,),
  606. )
  607. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  608. cursor.execute(alter_sql)