repository.py 62 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _json_loads(value: Any) -> Any:
  20. if value is None:
  21. return None
  22. if isinstance(value, (dict, list)):
  23. return value
  24. if isinstance(value, (bytes, bytearray)):
  25. value = value.decode("utf-8")
  26. if isinstance(value, str):
  27. return json.loads(value)
  28. return value
  29. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  30. names: list[str] = []
  31. seen: set[str] = set()
  32. for item in demand_name_set:
  33. name = str(item).strip()
  34. if not name or name in seen:
  35. continue
  36. seen.add(name)
  37. names.append(name)
  38. return names
  39. def unique_title_key(source: str, title: str) -> str:
  40. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  41. class HotContentRepository:
  42. def __init__(self, config: MysqlConfig):
  43. if pymysql is None or DictCursor is None:
  44. raise HotContentFlowError("missing dependency: pip install pymysql")
  45. self.conn = pymysql.connect(
  46. host=config.host,
  47. port=config.port,
  48. user=config.user,
  49. password=config.password,
  50. database=config.database,
  51. charset=config.charset,
  52. autocommit=True,
  53. cursorclass=DictCursor,
  54. )
  55. def close(self) -> None:
  56. self.conn.close()
  57. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  58. key = unique_title_key(source, title)
  59. sql = """
  60. INSERT INTO hot_content_records (
  61. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  62. )
  63. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  64. ON DUPLICATE KEY UPDATE
  65. id=LAST_INSERT_ID(id),
  66. hot_rank=VALUES(hot_rank),
  67. updated_at=NOW()
  68. """
  69. with self.conn.cursor() as cursor:
  70. cursor.execute(
  71. sql,
  72. (
  73. key,
  74. source,
  75. title,
  76. rank,
  77. ExecutionStatus.HOT_SAVED,
  78. ),
  79. )
  80. record_id = int(cursor.lastrowid)
  81. cursor.execute(
  82. """
  83. SELECT
  84. id,
  85. unique_key,
  86. execution_status,
  87. article_title,
  88. article_body,
  89. article_url,
  90. decode_request_result IS NOT NULL AS has_decode_request,
  91. contribution_points_json IS NOT NULL AS has_contribution_points
  92. FROM hot_content_records
  93. WHERE id = %s
  94. """,
  95. (record_id,),
  96. )
  97. row = cursor.fetchone()
  98. if not row:
  99. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  100. return {
  101. "id": int(row["id"]),
  102. "unique_key": str(row["unique_key"]),
  103. "execution_status": int(row["execution_status"]),
  104. "article_title": row.get("article_title"),
  105. "article_body": row.get("article_body"),
  106. "article_url": row.get("article_url"),
  107. "has_decode_request": bool(row.get("has_decode_request")),
  108. "has_contribution_points": bool(row.get("has_contribution_points")),
  109. }
  110. def update_status(
  111. self,
  112. *,
  113. record_id: int,
  114. status: int,
  115. error_message: str | None = None,
  116. ) -> None:
  117. sql = """
  118. UPDATE hot_content_records
  119. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  120. WHERE id=%s
  121. """
  122. with self.conn.cursor() as cursor:
  123. cursor.execute(sql, (status, error_message, record_id))
  124. def update_article(
  125. self,
  126. *,
  127. record_id: int,
  128. article_title: str,
  129. article_body: str,
  130. url: str,
  131. ) -> None:
  132. sql = """
  133. UPDATE hot_content_records
  134. SET article_title=%s,
  135. article_body=%s,
  136. article_url=%s,
  137. execution_status=%s,
  138. error_reason=NULL,
  139. updated_at=NOW()
  140. WHERE id=%s
  141. """
  142. with self.conn.cursor() as cursor:
  143. cursor.execute(
  144. sql,
  145. (
  146. article_title,
  147. article_body,
  148. url,
  149. ExecutionStatus.CONTENT_OK,
  150. record_id,
  151. ),
  152. )
  153. def mark_no_valid_content(
  154. self,
  155. *,
  156. record_id: int,
  157. reason: str,
  158. ) -> None:
  159. """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
  160. sql = """
  161. UPDATE hot_content_records
  162. SET execution_status=%s,
  163. error_reason=%s,
  164. updated_at=NOW()
  165. WHERE id=%s
  166. """
  167. with self.conn.cursor() as cursor:
  168. cursor.execute(
  169. sql,
  170. (
  171. ExecutionStatus.NO_VALID_CONTENT,
  172. str(reason or "no valid content").strip(),
  173. record_id,
  174. ),
  175. )
  176. def update_category_filter_result(
  177. self,
  178. *,
  179. record_id: int,
  180. passed: bool,
  181. result_json: dict[str, Any],
  182. ) -> None:
  183. self._ensure_category_filter_columns()
  184. status = (
  185. ExecutionStatus.CATEGORY_FILTER_PASSED
  186. if passed
  187. else ExecutionStatus.CATEGORY_FILTER_REJECTED
  188. )
  189. reason = str(result_json.get("reason") or "").strip()
  190. error_message = None if passed else (reason or "category filter rejected")
  191. sql = """
  192. UPDATE hot_content_records
  193. SET execution_status=%s,
  194. category_filter_passed=%s,
  195. category_filter_reason=%s,
  196. category_filter_json=%s,
  197. error_reason=%s,
  198. updated_at=NOW()
  199. WHERE id=%s
  200. """
  201. with self.conn.cursor() as cursor:
  202. cursor.execute(
  203. sql,
  204. (
  205. status,
  206. 1 if passed else 0,
  207. reason or None,
  208. _json_dumps(result_json),
  209. error_message,
  210. record_id,
  211. ),
  212. )
  213. def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
  214. self._ensure_category_filter_columns()
  215. sql = """
  216. SELECT
  217. id,
  218. source,
  219. title,
  220. article_title,
  221. article_body,
  222. article_url,
  223. execution_status,
  224. category_filter_passed,
  225. category_filter_reason,
  226. category_filter_json
  227. FROM hot_content_records
  228. WHERE id = %s
  229. LIMIT 1
  230. """
  231. with self.conn.cursor() as cursor:
  232. cursor.execute(sql, (record_id,))
  233. row = cursor.fetchone()
  234. if not row:
  235. return None
  236. category_filter_json = _json_loads(row.get("category_filter_json"))
  237. passed_raw = row.get("category_filter_passed")
  238. passed: bool | None
  239. if passed_raw is None:
  240. passed = None
  241. else:
  242. passed = bool(int(passed_raw))
  243. return {
  244. "id": int(row["id"]),
  245. "source": str(row.get("source") or ""),
  246. "title": str(row.get("title") or ""),
  247. "article_title": str(row.get("article_title") or ""),
  248. "article_body": str(row.get("article_body") or ""),
  249. "article_url": str(row.get("article_url") or ""),
  250. "execution_status": int(row.get("execution_status") or 0),
  251. "category_filter_passed": passed,
  252. "category_filter_reason": str(row.get("category_filter_reason") or ""),
  253. "category_filter_json": (
  254. category_filter_json if isinstance(category_filter_json, dict) else {}
  255. ),
  256. }
  257. def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
  258. record = self.get_record_for_category_filter(record_id)
  259. if not record:
  260. return None
  261. matched_category = None
  262. category_filter_json = record.get("category_filter_json") or {}
  263. if isinstance(category_filter_json, dict):
  264. matched_category = category_filter_json.get("matched_category")
  265. return {
  266. "id": record["id"],
  267. "passed": record["category_filter_passed"],
  268. "reason": record["category_filter_reason"],
  269. "matched_category": matched_category,
  270. "execution_status": record["execution_status"],
  271. }
  272. def update_decode_result(
  273. self,
  274. *,
  275. record_id: int,
  276. status: int,
  277. request_json: dict[str, Any],
  278. response_json: dict[str, Any] | None,
  279. error_message: str | None = None,
  280. ) -> None:
  281. decode_request_result = {
  282. "request": request_json,
  283. "response": response_json,
  284. }
  285. sql = """
  286. UPDATE hot_content_records
  287. SET decode_request_result=%s,
  288. execution_status=%s,
  289. error_reason=%s,
  290. updated_at=NOW()
  291. WHERE id=%s
  292. """
  293. with self.conn.cursor() as cursor:
  294. cursor.execute(
  295. sql,
  296. (
  297. _json_dumps(decode_request_result),
  298. status,
  299. error_message,
  300. record_id,
  301. ),
  302. )
  303. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  304. sql = """
  305. SELECT id, unique_key
  306. FROM hot_content_records
  307. WHERE execution_status IN (%s, %s, %s)
  308. AND contribution_points_json IS NULL
  309. ORDER BY updated_at ASC, id ASC
  310. LIMIT %s
  311. """
  312. with self.conn.cursor() as cursor:
  313. cursor.execute(
  314. sql,
  315. (
  316. ExecutionStatus.DECODE_SUBMITTED,
  317. ExecutionStatus.DECODE_SUCCESS,
  318. ExecutionStatus.DECODE_PENDING,
  319. limit,
  320. ),
  321. )
  322. rows = cursor.fetchall()
  323. return [
  324. {
  325. "id": int(row["id"]),
  326. "unique_key": str(row["unique_key"]),
  327. }
  328. for row in rows
  329. ]
  330. def save_decode_result_export(
  331. self,
  332. *,
  333. record_id: int,
  334. decode_result_json: dict[str, Any],
  335. contribution_points_json: dict[str, Any],
  336. ) -> None:
  337. sql = """
  338. UPDATE hot_content_records
  339. SET decode_result_json=%s,
  340. contribution_points_json=%s,
  341. execution_status=%s,
  342. error_reason=NULL,
  343. updated_at=NOW()
  344. WHERE id=%s
  345. """
  346. with self.conn.cursor() as cursor:
  347. cursor.execute(
  348. sql,
  349. (
  350. _json_dumps(decode_result_json),
  351. _json_dumps(contribution_points_json),
  352. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  353. record_id,
  354. ),
  355. )
  356. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  357. sql = """
  358. SELECT
  359. id,
  360. cache_hour,
  361. source_table,
  362. partition_dt,
  363. demand_name_set_json,
  364. item_count,
  365. updated_at
  366. FROM demand_pool_hourly_cache
  367. WHERE cache_hour=%s
  368. LIMIT 1
  369. """
  370. with self.conn.cursor() as cursor:
  371. cursor.execute(sql, (cache_hour,))
  372. row = cursor.fetchone()
  373. if not row:
  374. return None
  375. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  376. if not isinstance(demand_name_set, list):
  377. demand_name_set = []
  378. return {
  379. "id": int(row["id"]),
  380. "cache_hour": row.get("cache_hour"),
  381. "source_table": str(row["source_table"]),
  382. "partition_dt": row.get("partition_dt"),
  383. "demand_name_set": [
  384. str(name).strip()
  385. for name in demand_name_set
  386. if str(name).strip()
  387. ],
  388. "item_count": int(row.get("item_count") or 0),
  389. "updated_at": row.get("updated_at"),
  390. }
  391. def save_demand_cache_set(
  392. self,
  393. *,
  394. cache_hour: datetime,
  395. source_table: str,
  396. partition_dt: str | None,
  397. excluded_strategy: str,
  398. top_n: int,
  399. demand_name_set: list[str],
  400. ) -> int:
  401. sql = """
  402. INSERT INTO demand_pool_hourly_cache (
  403. cache_hour,
  404. source_table,
  405. partition_dt,
  406. excluded_strategy,
  407. top_n,
  408. demand_name_set_json,
  409. item_count,
  410. created_at,
  411. updated_at
  412. )
  413. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  414. ON DUPLICATE KEY UPDATE
  415. id=LAST_INSERT_ID(id),
  416. source_table=VALUES(source_table),
  417. partition_dt=VALUES(partition_dt),
  418. excluded_strategy=VALUES(excluded_strategy),
  419. top_n=VALUES(top_n),
  420. demand_name_set_json=VALUES(demand_name_set_json),
  421. item_count=VALUES(item_count),
  422. updated_at=NOW()
  423. """
  424. normalized_names = _normalize_demand_names(demand_name_set)
  425. with self.conn.cursor() as cursor:
  426. cursor.execute(
  427. sql,
  428. (
  429. cache_hour,
  430. source_table,
  431. partition_dt,
  432. excluded_strategy,
  433. top_n,
  434. _json_dumps(normalized_names),
  435. len(normalized_names),
  436. ),
  437. )
  438. return int(cursor.lastrowid)
  439. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  440. sql = """
  441. SELECT
  442. id,
  443. unique_key,
  444. source,
  445. title,
  446. created_at,
  447. article_title,
  448. article_body,
  449. demand_cache_run_id,
  450. decode_result_json,
  451. contribution_points_json,
  452. contribution_demand_match_json
  453. FROM hot_content_records
  454. WHERE contribution_points_json IS NOT NULL
  455. AND postprocess_status IN (%s, %s, %s)
  456. ORDER BY updated_at ASC, id ASC
  457. LIMIT %s
  458. """
  459. with self.conn.cursor() as cursor:
  460. cursor.execute(
  461. sql,
  462. (
  463. PostprocessStatus.PENDING,
  464. PostprocessStatus.DEMAND_MATCHED,
  465. PostprocessStatus.FAILED,
  466. limit,
  467. ),
  468. )
  469. rows = cursor.fetchall()
  470. return [
  471. {
  472. "id": int(row["id"]),
  473. "unique_key": str(row["unique_key"]),
  474. "source": str(row.get("source") or ""),
  475. "title": str(row.get("title") or ""),
  476. "created_at": row.get("created_at"),
  477. "article_title": row.get("article_title"),
  478. "article_body": row.get("article_body"),
  479. "demand_cache_run_id": row.get("demand_cache_run_id"),
  480. "decode_result_json": _json_loads(row.get("decode_result_json")),
  481. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  482. "contribution_demand_match_json": _json_loads(
  483. row.get("contribution_demand_match_json")
  484. ),
  485. }
  486. for row in rows
  487. ]
  488. def save_contribution_demand_match(
  489. self,
  490. *,
  491. record_id: int,
  492. demand_cache_run_id: int,
  493. match_json: dict[str, Any],
  494. ) -> None:
  495. sql = """
  496. UPDATE hot_content_records
  497. SET demand_cache_run_id=%s,
  498. contribution_demand_match_json=%s,
  499. postprocess_status=%s,
  500. postprocess_error_reason=NULL,
  501. updated_at=NOW()
  502. WHERE id=%s
  503. """
  504. with self.conn.cursor() as cursor:
  505. cursor.execute(
  506. sql,
  507. (
  508. demand_cache_run_id,
  509. _json_dumps(match_json),
  510. PostprocessStatus.DEMAND_MATCHED,
  511. record_id,
  512. ),
  513. )
  514. def save_wxindex_trend(
  515. self,
  516. *,
  517. record_id: int,
  518. trend_json: dict[str, Any],
  519. ) -> None:
  520. sql = """
  521. UPDATE hot_content_records
  522. SET wxindex_trend_json=%s,
  523. postprocess_status=%s,
  524. postprocess_error_reason=NULL,
  525. updated_at=NOW()
  526. WHERE id=%s
  527. """
  528. with self.conn.cursor() as cursor:
  529. cursor.execute(
  530. sql,
  531. (
  532. _json_dumps(trend_json),
  533. PostprocessStatus.WXINDEX_DONE,
  534. record_id,
  535. ),
  536. )
  537. def save_demand_quality(
  538. self,
  539. *,
  540. record_id: int,
  541. event_sense_json: dict[str, Any],
  542. senior_fit_json: dict[str, Any],
  543. update_status: bool = True,
  544. ) -> None:
  545. self._ensure_record_quality_columns()
  546. if update_status:
  547. sql = """
  548. UPDATE hot_content_records
  549. SET demand_event_sense_json=%s,
  550. demand_senior_fit_json=%s,
  551. postprocess_status=%s,
  552. postprocess_error_reason=NULL,
  553. updated_at=NOW()
  554. WHERE id=%s
  555. """
  556. params = (
  557. _json_dumps(event_sense_json),
  558. _json_dumps(senior_fit_json),
  559. PostprocessStatus.QUALITY_DONE,
  560. record_id,
  561. )
  562. else:
  563. sql = """
  564. UPDATE hot_content_records
  565. SET demand_event_sense_json=%s,
  566. demand_senior_fit_json=%s,
  567. updated_at=NOW()
  568. WHERE id=%s
  569. """
  570. params = (
  571. _json_dumps(event_sense_json),
  572. _json_dumps(senior_fit_json),
  573. record_id,
  574. )
  575. with self.conn.cursor() as cursor:
  576. cursor.execute(sql, params)
  577. def update_postprocess_status(
  578. self,
  579. *,
  580. record_id: int,
  581. status: int,
  582. error_message: str | None = None,
  583. ) -> None:
  584. sql = """
  585. UPDATE hot_content_records
  586. SET postprocess_status=%s,
  587. postprocess_error_reason=%s,
  588. updated_at=NOW()
  589. WHERE id=%s
  590. """
  591. with self.conn.cursor() as cursor:
  592. cursor.execute(sql, (status, error_message, record_id))
  593. def replace_demand_export_rows(
  594. self,
  595. *,
  596. record_id: int,
  597. source: str,
  598. hot_title: str,
  599. article_title: str,
  600. rows: list[dict[str, Any]],
  601. ) -> None:
  602. self._ensure_demand_export_table()
  603. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  604. insert_sql = """
  605. INSERT INTO hot_content_demand_exports (
  606. record_id,
  607. source,
  608. hot_title,
  609. article_title,
  610. item_type,
  611. item_text,
  612. point_category,
  613. matched_demand,
  614. contribution_score,
  615. wxindex_keyword,
  616. all_hot_keywords,
  617. wxindex_latest_score,
  618. wxindex_trend,
  619. is_as_demand,
  620. event_sense_score,
  621. senior_fit_score,
  622. created_at,
  623. updated_at
  624. )
  625. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  626. """
  627. with self.conn.cursor() as cursor:
  628. cursor.execute(delete_sql, (record_id,))
  629. insert_rows = [
  630. (
  631. record_id,
  632. source,
  633. hot_title,
  634. article_title,
  635. str(item.get("item_type") or ""),
  636. str(item.get("item_text") or ""),
  637. str(item.get("point_category") or ""),
  638. str(item.get("matched_demand") or ""),
  639. item.get("contribution_score"),
  640. str(item.get("wxindex_keyword") or ""),
  641. str(item.get("all_hot_keywords") or ""),
  642. float(item.get("wxindex_latest_score") or 0),
  643. str(item.get("wxindex_trend") or ""),
  644. int(item.get("is_as_demand") or 0),
  645. item.get("event_sense_score"),
  646. item.get("senior_fit_score"),
  647. )
  648. for item in rows
  649. if str(item.get("item_type") or "").strip()
  650. and str(item.get("item_text") or "").strip()
  651. ]
  652. if insert_rows:
  653. cursor.executemany(insert_sql, insert_rows)
  654. def list_odps_sync_records(self) -> list[dict[str, Any]]:
  655. """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
  656. self._ensure_record_quality_columns()
  657. today_start = datetime.now(SHANGHAI_TZ).replace(
  658. hour=0,
  659. minute=0,
  660. second=0,
  661. microsecond=0,
  662. tzinfo=None,
  663. )
  664. today_end = today_start + timedelta(days=1)
  665. sql = """
  666. SELECT
  667. id,
  668. contribution_points_json,
  669. contribution_demand_match_json,
  670. wxindex_trend_json,
  671. demand_event_sense_json,
  672. demand_senior_fit_json
  673. FROM hot_content_records
  674. WHERE created_at >= %s
  675. AND created_at < %s
  676. AND postprocess_status = %s
  677. AND contribution_demand_match_json IS NOT NULL
  678. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  679. ORDER BY id ASC
  680. """
  681. with self.conn.cursor() as cursor:
  682. cursor.execute(
  683. sql,
  684. (today_start, today_end, PostprocessStatus.QUALITY_DONE),
  685. )
  686. rows = cursor.fetchall()
  687. records: list[dict[str, Any]] = []
  688. for row in rows:
  689. records.append(
  690. {
  691. "id": int(row["id"]),
  692. "contribution_points_json": _json_loads(
  693. row.get("contribution_points_json")
  694. ),
  695. "contribution_demand_match_json": _json_loads(
  696. row.get("contribution_demand_match_json")
  697. ),
  698. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  699. "demand_event_sense_json": _json_loads(
  700. row.get("demand_event_sense_json")
  701. ),
  702. "demand_senior_fit_json": _json_loads(
  703. row.get("demand_senior_fit_json")
  704. ),
  705. }
  706. )
  707. return records
  708. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  709. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  710. self._ensure_demand_export_table()
  711. today_start = datetime.now(SHANGHAI_TZ).replace(
  712. hour=0,
  713. minute=0,
  714. second=0,
  715. microsecond=0,
  716. tzinfo=None,
  717. )
  718. today_end = today_start + timedelta(days=1)
  719. sql = """
  720. SELECT
  721. e.record_id,
  722. e.item_type,
  723. e.item_text,
  724. e.point_category,
  725. e.matched_demand,
  726. e.wxindex_latest_score
  727. FROM hot_content_demand_exports e
  728. INNER JOIN hot_content_records r ON r.id = e.record_id
  729. WHERE r.created_at >= %s
  730. AND r.created_at < %s
  731. ORDER BY e.record_id ASC, e.id ASC
  732. """
  733. with self.conn.cursor() as cursor:
  734. cursor.execute(sql, (today_start, today_end))
  735. rows = cursor.fetchall()
  736. grouped: dict[int, list[dict[str, Any]]] = {}
  737. for row in rows:
  738. record_id = int(row["record_id"])
  739. grouped.setdefault(record_id, []).append(
  740. {
  741. "item_type": str(row.get("item_type") or ""),
  742. "item_text": str(row.get("item_text") or ""),
  743. "point_category": str(row.get("point_category") or ""),
  744. "matched_demand": str(row.get("matched_demand") or ""),
  745. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  746. }
  747. )
  748. return [
  749. {"record_id": record_id, "export_rows": export_rows}
  750. for record_id, export_rows in grouped.items()
  751. ]
  752. def _ensure_demand_export_table(self) -> None:
  753. sql = """
  754. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  755. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  756. record_id BIGINT UNSIGNED NOT NULL,
  757. source VARCHAR(64) NOT NULL DEFAULT '',
  758. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  759. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  760. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  761. item_text VARCHAR(1024) NOT NULL,
  762. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  763. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  764. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  765. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  766. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  767. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  768. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  769. is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
  770. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  771. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  772. PRIMARY KEY (id),
  773. KEY idx_record_id (record_id),
  774. KEY idx_source_type (source, item_type)
  775. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  776. """
  777. with self.conn.cursor() as cursor:
  778. cursor.execute(sql)
  779. self._ensure_demand_export_column(
  780. cursor,
  781. "matched_demand",
  782. """
  783. ALTER TABLE hot_content_demand_exports
  784. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  785. COMMENT '匹配到的需求'
  786. AFTER item_text
  787. """,
  788. )
  789. self._ensure_demand_export_column(
  790. cursor,
  791. "point_category",
  792. """
  793. ALTER TABLE hot_content_demand_exports
  794. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  795. COMMENT '点类型:灵感点/目的点/关键点'
  796. AFTER item_text
  797. """,
  798. )
  799. self._ensure_demand_export_column(
  800. cursor,
  801. "contribution_score",
  802. """
  803. ALTER TABLE hot_content_demand_exports
  804. ADD COLUMN contribution_score DOUBLE NULL
  805. COMMENT '贡献分,仅词有值'
  806. AFTER matched_demand
  807. """,
  808. )
  809. self._ensure_demand_export_column(
  810. cursor,
  811. "wxindex_trend",
  812. """
  813. ALTER TABLE hot_content_demand_exports
  814. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  815. COMMENT '微信指数趋势'
  816. AFTER wxindex_latest_score
  817. """,
  818. )
  819. self._ensure_demand_export_column(
  820. cursor,
  821. "wxindex_keyword",
  822. """
  823. ALTER TABLE hot_content_demand_exports
  824. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  825. COMMENT '获取微信指数的词'
  826. AFTER contribution_score
  827. """,
  828. )
  829. self._ensure_demand_export_column(
  830. cursor,
  831. "all_hot_keywords",
  832. """
  833. ALTER TABLE hot_content_demand_exports
  834. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  835. COMMENT '全部热点词'
  836. AFTER wxindex_keyword
  837. """,
  838. )
  839. self._ensure_demand_export_column(
  840. cursor,
  841. "is_as_demand",
  842. """
  843. ALTER TABLE hot_content_demand_exports
  844. ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
  845. COMMENT '是否作为需求:0否 1是'
  846. AFTER wxindex_trend
  847. """,
  848. )
  849. self._ensure_demand_export_column(
  850. cursor,
  851. "event_sense_score",
  852. """
  853. ALTER TABLE hot_content_demand_exports
  854. ADD COLUMN event_sense_score DOUBLE NULL
  855. COMMENT '事件性得分 0-10'
  856. AFTER is_as_demand
  857. """,
  858. )
  859. self._ensure_demand_export_column(
  860. cursor,
  861. "senior_fit_score",
  862. """
  863. ALTER TABLE hot_content_demand_exports
  864. ADD COLUMN senior_fit_score DOUBLE NULL
  865. COMMENT '老年性得分 0-10'
  866. AFTER event_sense_score
  867. """,
  868. )
  869. def _ensure_record_quality_columns(self) -> None:
  870. with self.conn.cursor() as cursor:
  871. for column_name, alter_sql in (
  872. (
  873. "demand_event_sense_json",
  874. """
  875. ALTER TABLE hot_content_records
  876. ADD COLUMN demand_event_sense_json JSON NULL
  877. COMMENT '需求事件性 LLM 评分结果'
  878. AFTER wxindex_trend_json
  879. """,
  880. ),
  881. (
  882. "demand_senior_fit_json",
  883. """
  884. ALTER TABLE hot_content_records
  885. ADD COLUMN demand_senior_fit_json JSON NULL
  886. COMMENT '需求老年性 LLM 评分结果'
  887. AFTER demand_event_sense_json
  888. """,
  889. ),
  890. ):
  891. cursor.execute(
  892. """
  893. SELECT COUNT(*) AS cnt
  894. FROM information_schema.COLUMNS
  895. WHERE TABLE_SCHEMA = DATABASE()
  896. AND TABLE_NAME = 'hot_content_records'
  897. AND COLUMN_NAME = %s
  898. """,
  899. (column_name,),
  900. )
  901. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  902. cursor.execute(alter_sql)
  903. def _ensure_category_filter_columns(self) -> None:
  904. with self.conn.cursor() as cursor:
  905. for column_name, alter_sql in (
  906. (
  907. "category_filter_json",
  908. """
  909. ALTER TABLE hot_content_records
  910. ADD COLUMN category_filter_json JSON NULL
  911. COMMENT '老年人兴趣分类筛选 LLM 结果'
  912. AFTER article_url
  913. """,
  914. ),
  915. (
  916. "category_filter_passed",
  917. """
  918. ALTER TABLE hot_content_records
  919. ADD COLUMN category_filter_passed TINYINT NULL
  920. COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
  921. AFTER category_filter_json
  922. """,
  923. ),
  924. (
  925. "category_filter_reason",
  926. """
  927. ALTER TABLE hot_content_records
  928. ADD COLUMN category_filter_reason TEXT NULL
  929. COMMENT '分类筛选原因'
  930. AFTER category_filter_passed
  931. """,
  932. ),
  933. ):
  934. cursor.execute(
  935. """
  936. SELECT COUNT(*) AS cnt
  937. FROM information_schema.COLUMNS
  938. WHERE TABLE_SCHEMA = DATABASE()
  939. AND TABLE_NAME = 'hot_content_records'
  940. AND COLUMN_NAME = %s
  941. """,
  942. (column_name,),
  943. )
  944. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  945. cursor.execute(alter_sql)
  946. def _ensure_demand_export_column(
  947. self,
  948. cursor: Any,
  949. column_name: str,
  950. alter_sql: str,
  951. ) -> None:
  952. cursor.execute(
  953. """
  954. SELECT COUNT(*) AS cnt
  955. FROM information_schema.COLUMNS
  956. WHERE TABLE_SCHEMA = DATABASE()
  957. AND TABLE_NAME = 'hot_content_demand_exports'
  958. AND COLUMN_NAME = %s
  959. """,
  960. (column_name,),
  961. )
  962. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  963. cursor.execute(alter_sql)
  964. def list_synced_odps_demand_ids(
  965. self,
  966. *,
  967. partition_dt: str,
  968. strategy: str,
  969. ) -> set[str]:
  970. self._ensure_odps_sync_log_table()
  971. sql = """
  972. SELECT demand_id
  973. FROM hot_content_odps_sync_log
  974. WHERE partition_dt = %s
  975. AND strategy = %s
  976. """
  977. with self.conn.cursor() as cursor:
  978. cursor.execute(sql, (partition_dt, strategy))
  979. rows = cursor.fetchall()
  980. return {
  981. str(row.get("demand_id") or "").strip()
  982. for row in rows
  983. if str(row.get("demand_id") or "").strip()
  984. }
  985. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  986. if not rows:
  987. return 0
  988. self._ensure_odps_sync_log_table()
  989. sql = """
  990. INSERT INTO hot_content_odps_sync_log (
  991. partition_dt,
  992. strategy,
  993. demand_id,
  994. demand_name,
  995. demand_type,
  996. record_id,
  997. weight
  998. )
  999. VALUES (%s, %s, %s, %s, %s, %s, %s)
  1000. ON DUPLICATE KEY UPDATE
  1001. demand_name = VALUES(demand_name),
  1002. demand_type = VALUES(demand_type),
  1003. record_id = VALUES(record_id),
  1004. weight = VALUES(weight),
  1005. synced_at = CURRENT_TIMESTAMP
  1006. """
  1007. insert_rows = [
  1008. (
  1009. str(item.get("partition_dt") or ""),
  1010. str(item.get("strategy") or ""),
  1011. str(item.get("demand_id") or ""),
  1012. str(item.get("demand_name") or ""),
  1013. str(item.get("demand_type") or ""),
  1014. int(item.get("record_id") or 0),
  1015. float(item["weight"]) if item.get("weight") is not None else None,
  1016. )
  1017. for item in rows
  1018. if str(item.get("demand_id") or "").strip()
  1019. ]
  1020. with self.conn.cursor() as cursor:
  1021. cursor.executemany(sql, insert_rows)
  1022. return len(insert_rows)
  1023. def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
  1024. cursor.execute(
  1025. """
  1026. SELECT COUNT(*) AS cnt
  1027. FROM information_schema.COLUMNS
  1028. WHERE TABLE_SCHEMA = DATABASE()
  1029. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1030. AND COLUMN_NAME = 'weight'
  1031. """,
  1032. )
  1033. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1034. cursor.execute(
  1035. """
  1036. ALTER TABLE hot_content_odps_sync_log
  1037. ADD COLUMN weight DOUBLE NULL DEFAULT NULL
  1038. COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
  1039. AFTER record_id
  1040. """
  1041. )
  1042. def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
  1043. word = str(name or "").strip()
  1044. if not word:
  1045. return []
  1046. self._ensure_wxindex_words_table()
  1047. sql = """
  1048. SELECT dt, total_score
  1049. FROM hot_content_wxindex_words
  1050. WHERE name = %s
  1051. ORDER BY dt ASC
  1052. """
  1053. with self.conn.cursor() as cursor:
  1054. cursor.execute(sql, (word,))
  1055. rows = cursor.fetchall()
  1056. scores: list[dict[str, Any]] = []
  1057. for row in rows:
  1058. dt = str(row.get("dt") or "").strip()
  1059. if not dt:
  1060. continue
  1061. try:
  1062. total_score = float(row["total_score"])
  1063. except (TypeError, ValueError, KeyError):
  1064. continue
  1065. scores.append({"ymd": dt, "total_score": total_score})
  1066. return scores
  1067. def list_stale_wxindex_words(
  1068. self,
  1069. *,
  1070. end_ymd: str,
  1071. update_window_days: int = 7,
  1072. today: date | None = None,
  1073. ) -> list[dict[str, Any]]:
  1074. """返回更新窗口内、仍缺近 7 日区间数据的词。"""
  1075. target_end = str(end_ymd or "").strip()
  1076. if not target_end:
  1077. return []
  1078. current = today or datetime.now(SHANGHAI_TZ).date()
  1079. active_since = current - timedelta(days=max(update_window_days, 0))
  1080. self._ensure_wxindex_word_meta_table()
  1081. self._ensure_wxindex_words_table()
  1082. sql = """
  1083. SELECT
  1084. m.name,
  1085. m.event_created_at,
  1086. m.fetch_start_ymd,
  1087. MIN(w.dt) AS earliest_dt,
  1088. MAX(w.dt) AS latest_dt
  1089. FROM hot_content_wxindex_word_meta m
  1090. INNER JOIN hot_content_wxindex_words w ON w.name = m.name
  1091. WHERE DATE(m.event_created_at) >= %s
  1092. GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
  1093. HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
  1094. ORDER BY m.name ASC
  1095. """
  1096. with self.conn.cursor() as cursor:
  1097. cursor.execute(sql, (active_since, target_end))
  1098. rows = cursor.fetchall()
  1099. stale_words: list[dict[str, Any]] = []
  1100. for row in rows:
  1101. name = str(row.get("name") or "").strip()
  1102. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1103. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1104. latest_dt = str(row.get("latest_dt") or "").strip()
  1105. event_created_at = row.get("event_created_at")
  1106. if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
  1107. stale_words.append(
  1108. {
  1109. "name": name,
  1110. "event_created_at": event_created_at,
  1111. "fetch_start_ymd": fetch_start_ymd,
  1112. "earliest_dt": earliest_dt,
  1113. "latest_dt": latest_dt,
  1114. }
  1115. )
  1116. return stale_words
  1117. def list_word_earliest_event_times(
  1118. self,
  1119. *,
  1120. since_dt: datetime,
  1121. ) -> dict[str, datetime]:
  1122. """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
  1123. self._ensure_record_quality_columns()
  1124. sql = """
  1125. SELECT
  1126. word_name,
  1127. MIN(event_created_at) AS event_created_at
  1128. FROM (
  1129. SELECT
  1130. TRIM(searches.keyword) AS word_name,
  1131. r.created_at AS event_created_at
  1132. FROM hot_content_records r
  1133. JOIN JSON_TABLE(
  1134. r.wxindex_trend_json,
  1135. '$.wxindex_searches[*]' COLUMNS (
  1136. keyword VARCHAR(256) PATH '$.keyword'
  1137. )
  1138. ) AS searches
  1139. WHERE r.created_at >= %s
  1140. AND r.wxindex_trend_json IS NOT NULL
  1141. AND TRIM(searches.keyword) <> ''
  1142. UNION ALL
  1143. SELECT
  1144. TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
  1145. r.created_at AS event_created_at
  1146. FROM hot_content_records r
  1147. WHERE r.created_at >= %s
  1148. AND r.wxindex_trend_json IS NOT NULL
  1149. AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
  1150. ) AS word_events
  1151. WHERE word_name IS NOT NULL
  1152. AND word_name <> ''
  1153. GROUP BY word_name
  1154. """
  1155. with self.conn.cursor() as cursor:
  1156. cursor.execute(sql, (since_dt, since_dt))
  1157. rows = cursor.fetchall()
  1158. event_map: dict[str, datetime] = {}
  1159. for row in rows:
  1160. name = str(row.get("word_name") or "").strip()
  1161. event_created_at = row.get("event_created_at")
  1162. if name and isinstance(event_created_at, datetime):
  1163. event_map[name] = event_created_at
  1164. return event_map
  1165. def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
  1166. self._ensure_wxindex_word_meta_table()
  1167. self._ensure_wxindex_words_table()
  1168. sql = """
  1169. SELECT
  1170. w.name,
  1171. MIN(w.dt) AS earliest_dt,
  1172. MIN(w.created_at) AS first_created_at
  1173. FROM hot_content_wxindex_words w
  1174. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1175. WHERE m.name IS NULL
  1176. GROUP BY w.name
  1177. ORDER BY w.name ASC
  1178. """
  1179. with self.conn.cursor() as cursor:
  1180. cursor.execute(sql)
  1181. rows = cursor.fetchall()
  1182. bounds: list[dict[str, Any]] = []
  1183. for row in rows:
  1184. name = str(row.get("name") or "").strip()
  1185. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1186. first_created_at = row.get("first_created_at")
  1187. if name and earliest_dt:
  1188. bounds.append(
  1189. {
  1190. "name": name,
  1191. "earliest_dt": earliest_dt,
  1192. "first_created_at": first_created_at,
  1193. }
  1194. )
  1195. return bounds
  1196. def list_wxindex_word_names_without_meta(self) -> list[str]:
  1197. self._ensure_wxindex_word_meta_table()
  1198. self._ensure_wxindex_words_table()
  1199. sql = """
  1200. SELECT DISTINCT w.name
  1201. FROM hot_content_wxindex_words w
  1202. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1203. WHERE m.name IS NULL
  1204. ORDER BY w.name ASC
  1205. """
  1206. with self.conn.cursor() as cursor:
  1207. cursor.execute(sql)
  1208. rows = cursor.fetchall()
  1209. return [
  1210. str(row.get("name") or "").strip()
  1211. for row in rows
  1212. if str(row.get("name") or "").strip()
  1213. ]
  1214. def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
  1215. word = str(name or "").strip()
  1216. if not word:
  1217. return None
  1218. self._ensure_wxindex_words_table()
  1219. sql = """
  1220. SELECT MIN(created_at) AS first_created_at
  1221. FROM hot_content_wxindex_words
  1222. WHERE name = %s
  1223. """
  1224. with self.conn.cursor() as cursor:
  1225. cursor.execute(sql, (word,))
  1226. row = cursor.fetchone() or {}
  1227. first_created_at = row.get("first_created_at")
  1228. return first_created_at if isinstance(first_created_at, datetime) else None
  1229. def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
  1230. self._ensure_wxindex_word_meta_table()
  1231. sql = """
  1232. SELECT name, event_created_at, fetch_start_ymd
  1233. FROM hot_content_wxindex_word_meta
  1234. ORDER BY name ASC
  1235. """
  1236. with self.conn.cursor() as cursor:
  1237. cursor.execute(sql)
  1238. rows = cursor.fetchall()
  1239. result: list[dict[str, Any]] = []
  1240. for row in rows:
  1241. name = str(row.get("name") or "").strip()
  1242. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1243. event_created_at = row.get("event_created_at")
  1244. if name and fetch_start_ymd and event_created_at is not None:
  1245. result.append(
  1246. {
  1247. "name": name,
  1248. "event_created_at": event_created_at,
  1249. "fetch_start_ymd": fetch_start_ymd,
  1250. }
  1251. )
  1252. return result
  1253. def update_wxindex_word_meta_fetch_start(
  1254. self,
  1255. *,
  1256. name: str,
  1257. fetch_start_ymd: str,
  1258. ) -> None:
  1259. word = str(name or "").strip()
  1260. target_start = str(fetch_start_ymd or "").strip()
  1261. if not word or not target_start:
  1262. raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
  1263. self._ensure_wxindex_word_meta_table()
  1264. sql = """
  1265. UPDATE hot_content_wxindex_word_meta
  1266. SET fetch_start_ymd = %s
  1267. WHERE name = %s
  1268. """
  1269. with self.conn.cursor() as cursor:
  1270. cursor.execute(sql, (target_start, word))
  1271. def update_wxindex_word_meta(
  1272. self,
  1273. *,
  1274. name: str,
  1275. event_created_at: datetime,
  1276. fetch_start_ymd: str,
  1277. ) -> None:
  1278. word = str(name or "").strip()
  1279. target_start = str(fetch_start_ymd or "").strip()
  1280. if not word or not target_start:
  1281. raise HotContentFlowError("invalid wxindex word meta payload")
  1282. self._ensure_wxindex_word_meta_table()
  1283. event_at = event_created_at
  1284. if event_at.tzinfo is not None:
  1285. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  1286. sql = """
  1287. UPDATE hot_content_wxindex_word_meta
  1288. SET event_created_at = %s,
  1289. fetch_start_ymd = %s
  1290. WHERE name = %s
  1291. """
  1292. with self.conn.cursor() as cursor:
  1293. cursor.execute(sql, (event_at, target_start, word))
  1294. def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
  1295. word = str(name or "").strip()
  1296. if not word:
  1297. return None
  1298. self._ensure_wxindex_word_meta_table()
  1299. sql = """
  1300. SELECT name, event_created_at, fetch_start_ymd
  1301. FROM hot_content_wxindex_word_meta
  1302. WHERE name = %s
  1303. """
  1304. with self.conn.cursor() as cursor:
  1305. cursor.execute(sql, (word,))
  1306. row = cursor.fetchone()
  1307. if not row:
  1308. return None
  1309. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1310. event_created_at = row.get("event_created_at")
  1311. if not fetch_start_ymd or event_created_at is None:
  1312. return None
  1313. return {
  1314. "name": str(row.get("name") or "").strip(),
  1315. "event_created_at": event_created_at,
  1316. "fetch_start_ymd": fetch_start_ymd,
  1317. }
  1318. def ensure_wxindex_word_meta(
  1319. self,
  1320. *,
  1321. name: str,
  1322. event_created_at: datetime,
  1323. fetch_start_ymd: str,
  1324. ) -> dict[str, Any]:
  1325. word = str(name or "").strip()
  1326. target_start = str(fetch_start_ymd or "").strip()
  1327. if not word or not target_start:
  1328. raise HotContentFlowError("invalid wxindex word meta payload")
  1329. self._ensure_wxindex_word_meta_table()
  1330. event_at = event_created_at
  1331. if event_at.tzinfo is not None:
  1332. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  1333. sql = """
  1334. INSERT INTO hot_content_wxindex_word_meta (
  1335. name,
  1336. event_created_at,
  1337. fetch_start_ymd
  1338. )
  1339. VALUES (%s, %s, %s)
  1340. ON DUPLICATE KEY UPDATE
  1341. event_created_at = VALUES(event_created_at),
  1342. fetch_start_ymd = VALUES(fetch_start_ymd)
  1343. """
  1344. with self.conn.cursor() as cursor:
  1345. cursor.execute(sql, (word, event_at, target_start))
  1346. meta = self.get_wxindex_word_meta(word)
  1347. if meta is None:
  1348. raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
  1349. return meta
  1350. def list_low_max_wxindex_words(
  1351. self,
  1352. *,
  1353. min_max_score: float,
  1354. ) -> list[dict[str, Any]]:
  1355. """按 name 聚合,返回最大值低于阈值的词。"""
  1356. self._ensure_wxindex_words_table()
  1357. sql = """
  1358. SELECT
  1359. name,
  1360. MAX(total_score) AS max_score,
  1361. COUNT(*) AS row_count
  1362. FROM hot_content_wxindex_words
  1363. GROUP BY name
  1364. HAVING MAX(total_score) < %s
  1365. ORDER BY name ASC
  1366. """
  1367. with self.conn.cursor() as cursor:
  1368. cursor.execute(sql, (min_max_score,))
  1369. rows = cursor.fetchall()
  1370. low_words: list[dict[str, Any]] = []
  1371. for row in rows:
  1372. name = str(row.get("name") or "").strip()
  1373. if not name:
  1374. continue
  1375. try:
  1376. max_score = float(row["max_score"])
  1377. row_count = int(row["row_count"])
  1378. except (TypeError, ValueError, KeyError):
  1379. continue
  1380. low_words.append(
  1381. {
  1382. "name": name,
  1383. "max_score": max_score,
  1384. "row_count": row_count,
  1385. }
  1386. )
  1387. return low_words
  1388. def count_wxindex_words_outside_event_window(
  1389. self,
  1390. *,
  1391. window_days: int = 7,
  1392. ) -> int:
  1393. self._ensure_wxindex_word_meta_table()
  1394. self._ensure_wxindex_words_table()
  1395. sql = """
  1396. SELECT COUNT(*) AS row_count
  1397. FROM hot_content_wxindex_words w
  1398. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1399. WHERE w.dt < DATE_FORMAT(
  1400. DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
  1401. '%%Y%%m%%d'
  1402. )
  1403. OR w.dt > DATE_FORMAT(
  1404. DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
  1405. '%%Y%%m%%d'
  1406. )
  1407. """
  1408. with self.conn.cursor() as cursor:
  1409. cursor.execute(sql, (window_days, window_days))
  1410. row = cursor.fetchone() or {}
  1411. return int(row.get("row_count") or 0)
  1412. def list_wxindex_words_outside_event_window_samples(
  1413. self,
  1414. *,
  1415. window_days: int = 7,
  1416. limit: int = 20,
  1417. ) -> list[dict[str, Any]]:
  1418. self._ensure_wxindex_word_meta_table()
  1419. self._ensure_wxindex_words_table()
  1420. sql = """
  1421. SELECT
  1422. w.name,
  1423. w.dt,
  1424. m.event_created_at,
  1425. DATE_FORMAT(
  1426. DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
  1427. '%%Y%%m%%d'
  1428. ) AS start_ymd,
  1429. DATE_FORMAT(
  1430. DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
  1431. '%%Y%%m%%d'
  1432. ) AS end_ymd
  1433. FROM hot_content_wxindex_words w
  1434. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1435. WHERE w.dt < DATE_FORMAT(
  1436. DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
  1437. '%%Y%%m%%d'
  1438. )
  1439. OR w.dt > DATE_FORMAT(
  1440. DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
  1441. '%%Y%%m%%d'
  1442. )
  1443. ORDER BY w.name ASC, w.dt ASC
  1444. LIMIT %s
  1445. """
  1446. with self.conn.cursor() as cursor:
  1447. cursor.execute(sql, (window_days, window_days, window_days, window_days, limit))
  1448. rows = cursor.fetchall()
  1449. samples: list[dict[str, Any]] = []
  1450. for row in rows:
  1451. name = str(row.get("name") or "").strip()
  1452. dt = str(row.get("dt") or "").strip()
  1453. if name and dt:
  1454. samples.append(
  1455. {
  1456. "name": name,
  1457. "dt": dt,
  1458. "event_created_at": row.get("event_created_at"),
  1459. "start_ymd": str(row.get("start_ymd") or "").strip(),
  1460. "end_ymd": str(row.get("end_ymd") or "").strip(),
  1461. }
  1462. )
  1463. return samples
  1464. def delete_wxindex_words_outside_event_window(
  1465. self,
  1466. *,
  1467. window_days: int = 7,
  1468. ) -> int:
  1469. self._ensure_wxindex_word_meta_table()
  1470. self._ensure_wxindex_words_table()
  1471. sql = """
  1472. DELETE w
  1473. FROM hot_content_wxindex_words w
  1474. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1475. WHERE w.dt < DATE_FORMAT(
  1476. DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
  1477. '%%Y%%m%%d'
  1478. )
  1479. OR w.dt > DATE_FORMAT(
  1480. DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
  1481. '%%Y%%m%%d'
  1482. )
  1483. """
  1484. with self.conn.cursor() as cursor:
  1485. cursor.execute(sql, (window_days, window_days))
  1486. return int(cursor.rowcount or 0)
  1487. def count_wxindex_words_without_meta(self) -> int:
  1488. self._ensure_wxindex_word_meta_table()
  1489. self._ensure_wxindex_words_table()
  1490. sql = """
  1491. SELECT COUNT(*) AS row_count
  1492. FROM hot_content_wxindex_words w
  1493. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1494. WHERE m.name IS NULL
  1495. """
  1496. with self.conn.cursor() as cursor:
  1497. cursor.execute(sql)
  1498. row = cursor.fetchone() or {}
  1499. return int(row.get("row_count") or 0)
  1500. def delete_wxindex_words_by_names(self, names: list[str]) -> int:
  1501. cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
  1502. if not cleaned:
  1503. return 0
  1504. self._ensure_wxindex_words_table()
  1505. placeholders = ", ".join(["%s"] * len(cleaned))
  1506. sql = f"""
  1507. DELETE FROM hot_content_wxindex_words
  1508. WHERE name IN ({placeholders})
  1509. """
  1510. with self.conn.cursor() as cursor:
  1511. cursor.execute(sql, tuple(cleaned))
  1512. return int(cursor.rowcount or 0)
  1513. def has_wxindex_word(self, name: str) -> bool:
  1514. return self.get_wxindex_word_latest_dt(name) is not None
  1515. def get_wxindex_word_latest_dt(self, name: str) -> str | None:
  1516. word = str(name or "").strip()
  1517. if not word:
  1518. return None
  1519. self._ensure_wxindex_words_table()
  1520. sql = """
  1521. SELECT MAX(dt) AS latest_dt
  1522. FROM hot_content_wxindex_words
  1523. WHERE name = %s
  1524. """
  1525. with self.conn.cursor() as cursor:
  1526. cursor.execute(sql, (word,))
  1527. row = cursor.fetchone() or {}
  1528. latest_dt = str(row.get("latest_dt") or "").strip()
  1529. return latest_dt or None
  1530. def save_wxindex_daily_scores(
  1531. self,
  1532. *,
  1533. name: str,
  1534. scores: list[dict[str, Any]],
  1535. ) -> tuple[int, int]:
  1536. """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
  1537. word = str(name or "").strip()
  1538. if not word or not scores:
  1539. return 0, 0
  1540. self._ensure_wxindex_words_table()
  1541. sql = """
  1542. INSERT IGNORE INTO hot_content_wxindex_words (
  1543. name,
  1544. dt,
  1545. total_score
  1546. )
  1547. VALUES (%s, %s, %s)
  1548. """
  1549. rows: list[tuple[str, str, float]] = []
  1550. seen: set[tuple[str, str]] = set()
  1551. for item in scores:
  1552. if not isinstance(item, dict):
  1553. continue
  1554. dt = str(item.get("ymd") or item.get("dt") or "").strip()
  1555. if not dt:
  1556. continue
  1557. try:
  1558. total_score = float(item["total_score"])
  1559. except (TypeError, ValueError, KeyError):
  1560. continue
  1561. key = (word, dt)
  1562. if key in seen:
  1563. continue
  1564. seen.add(key)
  1565. rows.append((word, dt, total_score))
  1566. if not rows:
  1567. return 0, 0
  1568. inserted = 0
  1569. with self.conn.cursor() as cursor:
  1570. for row in rows:
  1571. cursor.execute(sql, row)
  1572. inserted += int(cursor.rowcount or 0)
  1573. skipped = len(rows) - inserted
  1574. return inserted, skipped
  1575. def list_records_with_wxindex_trend_after(
  1576. self,
  1577. *,
  1578. after_created_at: datetime,
  1579. ) -> list[dict[str, Any]]:
  1580. sql = """
  1581. SELECT id, created_at, wxindex_trend_json
  1582. FROM hot_content_records
  1583. WHERE created_at > %s
  1584. AND wxindex_trend_json IS NOT NULL
  1585. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  1586. ORDER BY id ASC
  1587. """
  1588. with self.conn.cursor() as cursor:
  1589. cursor.execute(sql, (after_created_at,))
  1590. rows = cursor.fetchall()
  1591. records: list[dict[str, Any]] = []
  1592. for row in rows:
  1593. records.append(
  1594. {
  1595. "id": int(row["id"]),
  1596. "created_at": row.get("created_at"),
  1597. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  1598. }
  1599. )
  1600. return records
  1601. def list_records_with_wxindex_trend(
  1602. self,
  1603. *,
  1604. since_dt: datetime,
  1605. ) -> list[dict[str, Any]]:
  1606. sql = """
  1607. SELECT id, created_at, wxindex_trend_json
  1608. FROM hot_content_records
  1609. WHERE created_at >= %s
  1610. AND wxindex_trend_json IS NOT NULL
  1611. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  1612. ORDER BY id ASC
  1613. """
  1614. with self.conn.cursor() as cursor:
  1615. cursor.execute(sql, (since_dt,))
  1616. rows = cursor.fetchall()
  1617. records: list[dict[str, Any]] = []
  1618. for row in rows:
  1619. records.append(
  1620. {
  1621. "id": int(row["id"]),
  1622. "created_at": row.get("created_at"),
  1623. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  1624. }
  1625. )
  1626. return records
  1627. def _ensure_wxindex_word_meta_table(self) -> None:
  1628. sql = """
  1629. CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
  1630. name VARCHAR(256) NOT NULL COMMENT '词',
  1631. event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
  1632. fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
  1633. meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
  1634. PRIMARY KEY (name),
  1635. KEY idx_event_created_at (event_created_at)
  1636. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1637. """
  1638. with self.conn.cursor() as cursor:
  1639. cursor.execute(sql)
  1640. def _ensure_wxindex_words_table(self) -> None:
  1641. sql = """
  1642. CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
  1643. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1644. name VARCHAR(256) NOT NULL COMMENT '词',
  1645. dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
  1646. total_score DOUBLE NOT NULL COMMENT '微信指数',
  1647. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  1648. PRIMARY KEY (id),
  1649. UNIQUE KEY uk_name_dt (name, dt),
  1650. KEY idx_name (name),
  1651. KEY idx_dt (dt)
  1652. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1653. """
  1654. with self.conn.cursor() as cursor:
  1655. cursor.execute(sql)
  1656. def _ensure_odps_sync_log_table(self) -> None:
  1657. sql = """
  1658. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  1659. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1660. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  1661. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  1662. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  1663. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  1664. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  1665. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  1666. weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
  1667. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  1668. PRIMARY KEY (id),
  1669. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  1670. KEY idx_record_partition (record_id, partition_dt),
  1671. KEY idx_partition_strategy (partition_dt, strategy)
  1672. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1673. """
  1674. with self.conn.cursor() as cursor:
  1675. cursor.execute(sql)
  1676. self._ensure_odps_sync_log_weight_column(cursor)