repository.py 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _json_loads(value: Any) -> Any:
  20. if value is None:
  21. return None
  22. if isinstance(value, (dict, list)):
  23. return value
  24. if isinstance(value, (bytes, bytearray)):
  25. value = value.decode("utf-8")
  26. if isinstance(value, str):
  27. return json.loads(value)
  28. return value
  29. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  30. names: list[str] = []
  31. seen: set[str] = set()
  32. for item in demand_name_set:
  33. name = str(item).strip()
  34. if not name or name in seen:
  35. continue
  36. seen.add(name)
  37. names.append(name)
  38. return names
  39. def unique_title_key(source: str, title: str) -> str:
  40. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  41. class HotContentRepository:
  42. def __init__(self, config: MysqlConfig):
  43. if pymysql is None or DictCursor is None:
  44. raise HotContentFlowError("missing dependency: pip install pymysql")
  45. self.conn = pymysql.connect(
  46. host=config.host,
  47. port=config.port,
  48. user=config.user,
  49. password=config.password,
  50. database=config.database,
  51. charset=config.charset,
  52. autocommit=True,
  53. cursorclass=DictCursor,
  54. )
  55. def close(self) -> None:
  56. self.conn.close()
  57. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  58. key = unique_title_key(source, title)
  59. sql = """
  60. INSERT INTO hot_content_records (
  61. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  62. )
  63. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  64. ON DUPLICATE KEY UPDATE
  65. id=LAST_INSERT_ID(id),
  66. hot_rank=VALUES(hot_rank),
  67. updated_at=NOW()
  68. """
  69. with self.conn.cursor() as cursor:
  70. cursor.execute(
  71. sql,
  72. (
  73. key,
  74. source,
  75. title,
  76. rank,
  77. ExecutionStatus.HOT_SAVED,
  78. ),
  79. )
  80. record_id = int(cursor.lastrowid)
  81. cursor.execute(
  82. """
  83. SELECT
  84. id,
  85. unique_key,
  86. execution_status,
  87. article_title,
  88. article_body,
  89. article_url,
  90. decode_request_result IS NOT NULL AS has_decode_request,
  91. contribution_points_json IS NOT NULL AS has_contribution_points
  92. FROM hot_content_records
  93. WHERE id = %s
  94. """,
  95. (record_id,),
  96. )
  97. row = cursor.fetchone()
  98. if not row:
  99. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  100. return {
  101. "id": int(row["id"]),
  102. "unique_key": str(row["unique_key"]),
  103. "execution_status": int(row["execution_status"]),
  104. "article_title": row.get("article_title"),
  105. "article_body": row.get("article_body"),
  106. "article_url": row.get("article_url"),
  107. "has_decode_request": bool(row.get("has_decode_request")),
  108. "has_contribution_points": bool(row.get("has_contribution_points")),
  109. }
  110. def update_status(
  111. self,
  112. *,
  113. record_id: int,
  114. status: int,
  115. error_message: str | None = None,
  116. ) -> None:
  117. sql = """
  118. UPDATE hot_content_records
  119. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  120. WHERE id=%s
  121. """
  122. with self.conn.cursor() as cursor:
  123. cursor.execute(sql, (status, error_message, record_id))
  124. def update_article(
  125. self,
  126. *,
  127. record_id: int,
  128. article_title: str,
  129. article_body: str,
  130. url: str,
  131. ) -> None:
  132. sql = """
  133. UPDATE hot_content_records
  134. SET article_title=%s,
  135. article_body=%s,
  136. article_url=%s,
  137. execution_status=%s,
  138. error_reason=NULL,
  139. updated_at=NOW()
  140. WHERE id=%s
  141. """
  142. with self.conn.cursor() as cursor:
  143. cursor.execute(
  144. sql,
  145. (
  146. article_title,
  147. article_body,
  148. url,
  149. ExecutionStatus.CONTENT_OK,
  150. record_id,
  151. ),
  152. )
  153. def mark_no_valid_content(
  154. self,
  155. *,
  156. record_id: int,
  157. reason: str,
  158. ) -> None:
  159. """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
  160. sql = """
  161. UPDATE hot_content_records
  162. SET execution_status=%s,
  163. error_reason=%s,
  164. updated_at=NOW()
  165. WHERE id=%s
  166. """
  167. with self.conn.cursor() as cursor:
  168. cursor.execute(
  169. sql,
  170. (
  171. ExecutionStatus.NO_VALID_CONTENT,
  172. str(reason or "no valid content").strip(),
  173. record_id,
  174. ),
  175. )
  176. def update_category_filter_result(
  177. self,
  178. *,
  179. record_id: int,
  180. passed: bool,
  181. result_json: dict[str, Any],
  182. ) -> None:
  183. self._ensure_category_filter_columns()
  184. status = (
  185. ExecutionStatus.CATEGORY_FILTER_PASSED
  186. if passed
  187. else ExecutionStatus.CATEGORY_FILTER_REJECTED
  188. )
  189. reason = str(result_json.get("reason") or "").strip()
  190. error_message = None if passed else (reason or "category filter rejected")
  191. sql = """
  192. UPDATE hot_content_records
  193. SET execution_status=%s,
  194. category_filter_passed=%s,
  195. category_filter_reason=%s,
  196. category_filter_json=%s,
  197. error_reason=%s,
  198. updated_at=NOW()
  199. WHERE id=%s
  200. """
  201. with self.conn.cursor() as cursor:
  202. cursor.execute(
  203. sql,
  204. (
  205. status,
  206. 1 if passed else 0,
  207. reason or None,
  208. _json_dumps(result_json),
  209. error_message,
  210. record_id,
  211. ),
  212. )
  213. def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
  214. self._ensure_category_filter_columns()
  215. sql = """
  216. SELECT
  217. id,
  218. source,
  219. title,
  220. article_title,
  221. article_body,
  222. article_url,
  223. execution_status,
  224. category_filter_passed,
  225. category_filter_reason,
  226. category_filter_json
  227. FROM hot_content_records
  228. WHERE id = %s
  229. LIMIT 1
  230. """
  231. with self.conn.cursor() as cursor:
  232. cursor.execute(sql, (record_id,))
  233. row = cursor.fetchone()
  234. if not row:
  235. return None
  236. category_filter_json = _json_loads(row.get("category_filter_json"))
  237. passed_raw = row.get("category_filter_passed")
  238. passed: bool | None
  239. if passed_raw is None:
  240. passed = None
  241. else:
  242. passed = bool(int(passed_raw))
  243. return {
  244. "id": int(row["id"]),
  245. "source": str(row.get("source") or ""),
  246. "title": str(row.get("title") or ""),
  247. "article_title": str(row.get("article_title") or ""),
  248. "article_body": str(row.get("article_body") or ""),
  249. "article_url": str(row.get("article_url") or ""),
  250. "execution_status": int(row.get("execution_status") or 0),
  251. "category_filter_passed": passed,
  252. "category_filter_reason": str(row.get("category_filter_reason") or ""),
  253. "category_filter_json": (
  254. category_filter_json if isinstance(category_filter_json, dict) else {}
  255. ),
  256. }
  257. def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
  258. record = self.get_record_for_category_filter(record_id)
  259. if not record:
  260. return None
  261. matched_category = None
  262. category_filter_json = record.get("category_filter_json") or {}
  263. if isinstance(category_filter_json, dict):
  264. matched_category = category_filter_json.get("matched_category")
  265. return {
  266. "id": record["id"],
  267. "passed": record["category_filter_passed"],
  268. "reason": record["category_filter_reason"],
  269. "matched_category": matched_category,
  270. "execution_status": record["execution_status"],
  271. }
  272. def update_decode_result(
  273. self,
  274. *,
  275. record_id: int,
  276. status: int,
  277. request_json: dict[str, Any],
  278. response_json: dict[str, Any] | None,
  279. error_message: str | None = None,
  280. ) -> None:
  281. decode_request_result = {
  282. "request": request_json,
  283. "response": response_json,
  284. }
  285. sql = """
  286. UPDATE hot_content_records
  287. SET decode_request_result=%s,
  288. execution_status=%s,
  289. error_reason=%s,
  290. updated_at=NOW()
  291. WHERE id=%s
  292. """
  293. with self.conn.cursor() as cursor:
  294. cursor.execute(
  295. sql,
  296. (
  297. _json_dumps(decode_request_result),
  298. status,
  299. error_message,
  300. record_id,
  301. ),
  302. )
  303. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  304. sql = """
  305. SELECT id, unique_key
  306. FROM hot_content_records
  307. WHERE execution_status IN (%s, %s, %s)
  308. AND contribution_points_json IS NULL
  309. ORDER BY updated_at ASC, id ASC
  310. LIMIT %s
  311. """
  312. with self.conn.cursor() as cursor:
  313. cursor.execute(
  314. sql,
  315. (
  316. ExecutionStatus.DECODE_SUBMITTED,
  317. ExecutionStatus.DECODE_SUCCESS,
  318. ExecutionStatus.DECODE_PENDING,
  319. limit,
  320. ),
  321. )
  322. rows = cursor.fetchall()
  323. return [
  324. {
  325. "id": int(row["id"]),
  326. "unique_key": str(row["unique_key"]),
  327. }
  328. for row in rows
  329. ]
  330. def save_decode_result_export(
  331. self,
  332. *,
  333. record_id: int,
  334. decode_result_json: dict[str, Any],
  335. contribution_points_json: dict[str, Any],
  336. ) -> None:
  337. sql = """
  338. UPDATE hot_content_records
  339. SET decode_result_json=%s,
  340. contribution_points_json=%s,
  341. execution_status=%s,
  342. error_reason=NULL,
  343. updated_at=NOW()
  344. WHERE id=%s
  345. """
  346. with self.conn.cursor() as cursor:
  347. cursor.execute(
  348. sql,
  349. (
  350. _json_dumps(decode_result_json),
  351. _json_dumps(contribution_points_json),
  352. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  353. record_id,
  354. ),
  355. )
  356. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  357. sql = """
  358. SELECT
  359. id,
  360. cache_hour,
  361. source_table,
  362. partition_dt,
  363. demand_name_set_json,
  364. item_count,
  365. updated_at
  366. FROM demand_pool_hourly_cache
  367. WHERE cache_hour=%s
  368. LIMIT 1
  369. """
  370. with self.conn.cursor() as cursor:
  371. cursor.execute(sql, (cache_hour,))
  372. row = cursor.fetchone()
  373. if not row:
  374. return None
  375. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  376. if not isinstance(demand_name_set, list):
  377. demand_name_set = []
  378. return {
  379. "id": int(row["id"]),
  380. "cache_hour": row.get("cache_hour"),
  381. "source_table": str(row["source_table"]),
  382. "partition_dt": row.get("partition_dt"),
  383. "demand_name_set": [
  384. str(name).strip()
  385. for name in demand_name_set
  386. if str(name).strip()
  387. ],
  388. "item_count": int(row.get("item_count") or 0),
  389. "updated_at": row.get("updated_at"),
  390. }
  391. def save_demand_cache_set(
  392. self,
  393. *,
  394. cache_hour: datetime,
  395. source_table: str,
  396. partition_dt: str | None,
  397. excluded_strategy: str,
  398. top_n: int,
  399. demand_name_set: list[str],
  400. ) -> int:
  401. sql = """
  402. INSERT INTO demand_pool_hourly_cache (
  403. cache_hour,
  404. source_table,
  405. partition_dt,
  406. excluded_strategy,
  407. top_n,
  408. demand_name_set_json,
  409. item_count,
  410. created_at,
  411. updated_at
  412. )
  413. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  414. ON DUPLICATE KEY UPDATE
  415. id=LAST_INSERT_ID(id),
  416. source_table=VALUES(source_table),
  417. partition_dt=VALUES(partition_dt),
  418. excluded_strategy=VALUES(excluded_strategy),
  419. top_n=VALUES(top_n),
  420. demand_name_set_json=VALUES(demand_name_set_json),
  421. item_count=VALUES(item_count),
  422. updated_at=NOW()
  423. """
  424. normalized_names = _normalize_demand_names(demand_name_set)
  425. with self.conn.cursor() as cursor:
  426. cursor.execute(
  427. sql,
  428. (
  429. cache_hour,
  430. source_table,
  431. partition_dt,
  432. excluded_strategy,
  433. top_n,
  434. _json_dumps(normalized_names),
  435. len(normalized_names),
  436. ),
  437. )
  438. return int(cursor.lastrowid)
  439. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  440. sql = """
  441. SELECT
  442. id,
  443. unique_key,
  444. source,
  445. title,
  446. created_at,
  447. article_title,
  448. article_body,
  449. demand_cache_run_id,
  450. decode_result_json,
  451. contribution_points_json,
  452. contribution_demand_match_json
  453. FROM hot_content_records
  454. WHERE contribution_points_json IS NOT NULL
  455. AND postprocess_status IN (%s, %s, %s)
  456. ORDER BY updated_at ASC, id ASC
  457. LIMIT %s
  458. """
  459. with self.conn.cursor() as cursor:
  460. cursor.execute(
  461. sql,
  462. (
  463. PostprocessStatus.PENDING,
  464. PostprocessStatus.DEMAND_MATCHED,
  465. PostprocessStatus.FAILED,
  466. limit,
  467. ),
  468. )
  469. rows = cursor.fetchall()
  470. return [
  471. {
  472. "id": int(row["id"]),
  473. "unique_key": str(row["unique_key"]),
  474. "source": str(row.get("source") or ""),
  475. "title": str(row.get("title") or ""),
  476. "created_at": row.get("created_at"),
  477. "article_title": row.get("article_title"),
  478. "article_body": row.get("article_body"),
  479. "demand_cache_run_id": row.get("demand_cache_run_id"),
  480. "decode_result_json": _json_loads(row.get("decode_result_json")),
  481. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  482. "contribution_demand_match_json": _json_loads(
  483. row.get("contribution_demand_match_json")
  484. ),
  485. }
  486. for row in rows
  487. ]
  488. def save_contribution_demand_match(
  489. self,
  490. *,
  491. record_id: int,
  492. demand_cache_run_id: int,
  493. match_json: dict[str, Any],
  494. ) -> None:
  495. sql = """
  496. UPDATE hot_content_records
  497. SET demand_cache_run_id=%s,
  498. contribution_demand_match_json=%s,
  499. postprocess_status=%s,
  500. postprocess_error_reason=NULL,
  501. updated_at=NOW()
  502. WHERE id=%s
  503. """
  504. with self.conn.cursor() as cursor:
  505. cursor.execute(
  506. sql,
  507. (
  508. demand_cache_run_id,
  509. _json_dumps(match_json),
  510. PostprocessStatus.DEMAND_MATCHED,
  511. record_id,
  512. ),
  513. )
  514. def save_wxindex_trend(
  515. self,
  516. *,
  517. record_id: int,
  518. trend_json: dict[str, Any],
  519. ) -> None:
  520. sql = """
  521. UPDATE hot_content_records
  522. SET wxindex_trend_json=%s,
  523. postprocess_status=%s,
  524. postprocess_error_reason=NULL,
  525. updated_at=NOW()
  526. WHERE id=%s
  527. """
  528. with self.conn.cursor() as cursor:
  529. cursor.execute(
  530. sql,
  531. (
  532. _json_dumps(trend_json),
  533. PostprocessStatus.WXINDEX_DONE,
  534. record_id,
  535. ),
  536. )
  537. def save_demand_quality(
  538. self,
  539. *,
  540. record_id: int,
  541. event_sense_json: dict[str, Any],
  542. senior_fit_json: dict[str, Any],
  543. update_status: bool = True,
  544. ) -> None:
  545. self._ensure_record_quality_columns()
  546. if update_status:
  547. sql = """
  548. UPDATE hot_content_records
  549. SET demand_event_sense_json=%s,
  550. demand_senior_fit_json=%s,
  551. postprocess_status=%s,
  552. postprocess_error_reason=NULL,
  553. updated_at=NOW()
  554. WHERE id=%s
  555. """
  556. params = (
  557. _json_dumps(event_sense_json),
  558. _json_dumps(senior_fit_json),
  559. PostprocessStatus.QUALITY_DONE,
  560. record_id,
  561. )
  562. else:
  563. sql = """
  564. UPDATE hot_content_records
  565. SET demand_event_sense_json=%s,
  566. demand_senior_fit_json=%s,
  567. updated_at=NOW()
  568. WHERE id=%s
  569. """
  570. params = (
  571. _json_dumps(event_sense_json),
  572. _json_dumps(senior_fit_json),
  573. record_id,
  574. )
  575. with self.conn.cursor() as cursor:
  576. cursor.execute(sql, params)
  577. def update_postprocess_status(
  578. self,
  579. *,
  580. record_id: int,
  581. status: int,
  582. error_message: str | None = None,
  583. ) -> None:
  584. sql = """
  585. UPDATE hot_content_records
  586. SET postprocess_status=%s,
  587. postprocess_error_reason=%s,
  588. updated_at=NOW()
  589. WHERE id=%s
  590. """
  591. with self.conn.cursor() as cursor:
  592. cursor.execute(sql, (status, error_message, record_id))
  593. def replace_demand_export_rows(
  594. self,
  595. *,
  596. record_id: int,
  597. source: str,
  598. hot_title: str,
  599. article_title: str,
  600. rows: list[dict[str, Any]],
  601. ) -> None:
  602. self._ensure_demand_export_table()
  603. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  604. insert_sql = """
  605. INSERT INTO hot_content_demand_exports (
  606. record_id,
  607. source,
  608. hot_title,
  609. article_title,
  610. item_type,
  611. item_text,
  612. point_category,
  613. matched_demand,
  614. contribution_score,
  615. wxindex_keyword,
  616. all_hot_keywords,
  617. wxindex_latest_score,
  618. wxindex_trend,
  619. is_as_demand,
  620. event_sense_score,
  621. senior_fit_score,
  622. created_at,
  623. updated_at
  624. )
  625. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  626. """
  627. with self.conn.cursor() as cursor:
  628. cursor.execute(delete_sql, (record_id,))
  629. insert_rows = [
  630. (
  631. record_id,
  632. source,
  633. hot_title,
  634. article_title,
  635. str(item.get("item_type") or ""),
  636. str(item.get("item_text") or ""),
  637. str(item.get("point_category") or ""),
  638. str(item.get("matched_demand") or ""),
  639. item.get("contribution_score"),
  640. str(item.get("wxindex_keyword") or ""),
  641. str(item.get("all_hot_keywords") or ""),
  642. float(item.get("wxindex_latest_score") or 0),
  643. str(item.get("wxindex_trend") or ""),
  644. int(item.get("is_as_demand") or 0),
  645. item.get("event_sense_score"),
  646. item.get("senior_fit_score"),
  647. )
  648. for item in rows
  649. if str(item.get("item_type") or "").strip()
  650. and str(item.get("item_text") or "").strip()
  651. ]
  652. if insert_rows:
  653. cursor.executemany(insert_sql, insert_rows)
  654. def list_odps_sync_records(self) -> list[dict[str, Any]]:
  655. """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
  656. self._ensure_record_quality_columns()
  657. today_start = datetime.now(SHANGHAI_TZ).replace(
  658. hour=0,
  659. minute=0,
  660. second=0,
  661. microsecond=0,
  662. tzinfo=None,
  663. )
  664. today_end = today_start + timedelta(days=1)
  665. sql = """
  666. SELECT
  667. id,
  668. contribution_points_json,
  669. contribution_demand_match_json,
  670. wxindex_trend_json,
  671. demand_event_sense_json,
  672. demand_senior_fit_json
  673. FROM hot_content_records
  674. WHERE created_at >= %s
  675. AND created_at < %s
  676. AND postprocess_status = %s
  677. AND contribution_demand_match_json IS NOT NULL
  678. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  679. ORDER BY id ASC
  680. """
  681. with self.conn.cursor() as cursor:
  682. cursor.execute(
  683. sql,
  684. (today_start, today_end, PostprocessStatus.QUALITY_DONE),
  685. )
  686. rows = cursor.fetchall()
  687. records: list[dict[str, Any]] = []
  688. for row in rows:
  689. records.append(
  690. {
  691. "id": int(row["id"]),
  692. "contribution_points_json": _json_loads(
  693. row.get("contribution_points_json")
  694. ),
  695. "contribution_demand_match_json": _json_loads(
  696. row.get("contribution_demand_match_json")
  697. ),
  698. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  699. "demand_event_sense_json": _json_loads(
  700. row.get("demand_event_sense_json")
  701. ),
  702. "demand_senior_fit_json": _json_loads(
  703. row.get("demand_senior_fit_json")
  704. ),
  705. }
  706. )
  707. return records
  708. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  709. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  710. self._ensure_demand_export_table()
  711. today_start = datetime.now(SHANGHAI_TZ).replace(
  712. hour=0,
  713. minute=0,
  714. second=0,
  715. microsecond=0,
  716. tzinfo=None,
  717. )
  718. today_end = today_start + timedelta(days=1)
  719. sql = """
  720. SELECT
  721. e.record_id,
  722. e.item_type,
  723. e.item_text,
  724. e.point_category,
  725. e.matched_demand,
  726. e.wxindex_latest_score
  727. FROM hot_content_demand_exports e
  728. INNER JOIN hot_content_records r ON r.id = e.record_id
  729. WHERE r.created_at >= %s
  730. AND r.created_at < %s
  731. ORDER BY e.record_id ASC, e.id ASC
  732. """
  733. with self.conn.cursor() as cursor:
  734. cursor.execute(sql, (today_start, today_end))
  735. rows = cursor.fetchall()
  736. grouped: dict[int, list[dict[str, Any]]] = {}
  737. for row in rows:
  738. record_id = int(row["record_id"])
  739. grouped.setdefault(record_id, []).append(
  740. {
  741. "item_type": str(row.get("item_type") or ""),
  742. "item_text": str(row.get("item_text") or ""),
  743. "point_category": str(row.get("point_category") or ""),
  744. "matched_demand": str(row.get("matched_demand") or ""),
  745. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  746. }
  747. )
  748. return [
  749. {"record_id": record_id, "export_rows": export_rows}
  750. for record_id, export_rows in grouped.items()
  751. ]
  752. def _ensure_demand_export_table(self) -> None:
  753. sql = """
  754. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  755. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  756. record_id BIGINT UNSIGNED NOT NULL,
  757. source VARCHAR(64) NOT NULL DEFAULT '',
  758. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  759. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  760. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  761. item_text VARCHAR(1024) NOT NULL,
  762. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  763. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  764. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  765. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  766. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  767. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  768. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  769. is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
  770. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  771. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  772. PRIMARY KEY (id),
  773. KEY idx_record_id (record_id),
  774. KEY idx_source_type (source, item_type)
  775. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  776. """
  777. with self.conn.cursor() as cursor:
  778. cursor.execute(sql)
  779. self._ensure_demand_export_column(
  780. cursor,
  781. "matched_demand",
  782. """
  783. ALTER TABLE hot_content_demand_exports
  784. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  785. COMMENT '匹配到的需求'
  786. AFTER item_text
  787. """,
  788. )
  789. self._ensure_demand_export_column(
  790. cursor,
  791. "point_category",
  792. """
  793. ALTER TABLE hot_content_demand_exports
  794. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  795. COMMENT '点类型:灵感点/目的点/关键点'
  796. AFTER item_text
  797. """,
  798. )
  799. self._ensure_demand_export_column(
  800. cursor,
  801. "contribution_score",
  802. """
  803. ALTER TABLE hot_content_demand_exports
  804. ADD COLUMN contribution_score DOUBLE NULL
  805. COMMENT '贡献分,仅词有值'
  806. AFTER matched_demand
  807. """,
  808. )
  809. self._ensure_demand_export_column(
  810. cursor,
  811. "wxindex_trend",
  812. """
  813. ALTER TABLE hot_content_demand_exports
  814. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  815. COMMENT '微信指数趋势'
  816. AFTER wxindex_latest_score
  817. """,
  818. )
  819. self._ensure_demand_export_column(
  820. cursor,
  821. "wxindex_keyword",
  822. """
  823. ALTER TABLE hot_content_demand_exports
  824. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  825. COMMENT '获取微信指数的词'
  826. AFTER contribution_score
  827. """,
  828. )
  829. self._ensure_demand_export_column(
  830. cursor,
  831. "all_hot_keywords",
  832. """
  833. ALTER TABLE hot_content_demand_exports
  834. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  835. COMMENT '全部热点词'
  836. AFTER wxindex_keyword
  837. """,
  838. )
  839. self._ensure_demand_export_column(
  840. cursor,
  841. "is_as_demand",
  842. """
  843. ALTER TABLE hot_content_demand_exports
  844. ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
  845. COMMENT '是否作为需求:0否 1是'
  846. AFTER wxindex_trend
  847. """,
  848. )
  849. self._ensure_demand_export_column(
  850. cursor,
  851. "event_sense_score",
  852. """
  853. ALTER TABLE hot_content_demand_exports
  854. ADD COLUMN event_sense_score DOUBLE NULL
  855. COMMENT '事件性得分 0-10'
  856. AFTER is_as_demand
  857. """,
  858. )
  859. self._ensure_demand_export_column(
  860. cursor,
  861. "senior_fit_score",
  862. """
  863. ALTER TABLE hot_content_demand_exports
  864. ADD COLUMN senior_fit_score DOUBLE NULL
  865. COMMENT '老年性得分 0-10'
  866. AFTER event_sense_score
  867. """,
  868. )
  869. def _ensure_record_quality_columns(self) -> None:
  870. with self.conn.cursor() as cursor:
  871. for column_name, alter_sql in (
  872. (
  873. "demand_event_sense_json",
  874. """
  875. ALTER TABLE hot_content_records
  876. ADD COLUMN demand_event_sense_json JSON NULL
  877. COMMENT '需求事件性 LLM 评分结果'
  878. AFTER wxindex_trend_json
  879. """,
  880. ),
  881. (
  882. "demand_senior_fit_json",
  883. """
  884. ALTER TABLE hot_content_records
  885. ADD COLUMN demand_senior_fit_json JSON NULL
  886. COMMENT '需求老年性 LLM 评分结果'
  887. AFTER demand_event_sense_json
  888. """,
  889. ),
  890. ):
  891. cursor.execute(
  892. """
  893. SELECT COUNT(*) AS cnt
  894. FROM information_schema.COLUMNS
  895. WHERE TABLE_SCHEMA = DATABASE()
  896. AND TABLE_NAME = 'hot_content_records'
  897. AND COLUMN_NAME = %s
  898. """,
  899. (column_name,),
  900. )
  901. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  902. cursor.execute(alter_sql)
  903. def _ensure_category_filter_columns(self) -> None:
  904. with self.conn.cursor() as cursor:
  905. for column_name, alter_sql in (
  906. (
  907. "category_filter_json",
  908. """
  909. ALTER TABLE hot_content_records
  910. ADD COLUMN category_filter_json JSON NULL
  911. COMMENT '老年人兴趣分类筛选 LLM 结果'
  912. AFTER article_url
  913. """,
  914. ),
  915. (
  916. "category_filter_passed",
  917. """
  918. ALTER TABLE hot_content_records
  919. ADD COLUMN category_filter_passed TINYINT NULL
  920. COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
  921. AFTER category_filter_json
  922. """,
  923. ),
  924. (
  925. "category_filter_reason",
  926. """
  927. ALTER TABLE hot_content_records
  928. ADD COLUMN category_filter_reason TEXT NULL
  929. COMMENT '分类筛选原因'
  930. AFTER category_filter_passed
  931. """,
  932. ),
  933. ):
  934. cursor.execute(
  935. """
  936. SELECT COUNT(*) AS cnt
  937. FROM information_schema.COLUMNS
  938. WHERE TABLE_SCHEMA = DATABASE()
  939. AND TABLE_NAME = 'hot_content_records'
  940. AND COLUMN_NAME = %s
  941. """,
  942. (column_name,),
  943. )
  944. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  945. cursor.execute(alter_sql)
  946. def _ensure_demand_export_column(
  947. self,
  948. cursor: Any,
  949. column_name: str,
  950. alter_sql: str,
  951. ) -> None:
  952. cursor.execute(
  953. """
  954. SELECT COUNT(*) AS cnt
  955. FROM information_schema.COLUMNS
  956. WHERE TABLE_SCHEMA = DATABASE()
  957. AND TABLE_NAME = 'hot_content_demand_exports'
  958. AND COLUMN_NAME = %s
  959. """,
  960. (column_name,),
  961. )
  962. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  963. cursor.execute(alter_sql)
  964. def list_synced_odps_demand_ids(
  965. self,
  966. *,
  967. partition_dt: str,
  968. strategy: str,
  969. ) -> set[str]:
  970. self._ensure_odps_sync_log_table()
  971. sql = """
  972. SELECT demand_id
  973. FROM hot_content_odps_sync_log
  974. WHERE partition_dt = %s
  975. AND strategy = %s
  976. """
  977. with self.conn.cursor() as cursor:
  978. cursor.execute(sql, (partition_dt, strategy))
  979. rows = cursor.fetchall()
  980. return {
  981. str(row.get("demand_id") or "").strip()
  982. for row in rows
  983. if str(row.get("demand_id") or "").strip()
  984. }
  985. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  986. if not rows:
  987. return 0
  988. self._ensure_odps_sync_log_table()
  989. sql = """
  990. INSERT INTO hot_content_odps_sync_log (
  991. partition_dt,
  992. strategy,
  993. demand_id,
  994. demand_name,
  995. demand_type,
  996. record_id,
  997. weight
  998. )
  999. VALUES (%s, %s, %s, %s, %s, %s, %s)
  1000. ON DUPLICATE KEY UPDATE
  1001. demand_name = VALUES(demand_name),
  1002. demand_type = VALUES(demand_type),
  1003. record_id = VALUES(record_id),
  1004. weight = VALUES(weight),
  1005. synced_at = CURRENT_TIMESTAMP
  1006. """
  1007. insert_rows = [
  1008. (
  1009. str(item.get("partition_dt") or ""),
  1010. str(item.get("strategy") or ""),
  1011. str(item.get("demand_id") or ""),
  1012. str(item.get("demand_name") or ""),
  1013. str(item.get("demand_type") or ""),
  1014. int(item.get("record_id") or 0),
  1015. float(item["weight"]) if item.get("weight") is not None else None,
  1016. )
  1017. for item in rows
  1018. if str(item.get("demand_id") or "").strip()
  1019. ]
  1020. with self.conn.cursor() as cursor:
  1021. cursor.executemany(sql, insert_rows)
  1022. return len(insert_rows)
  1023. def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
  1024. cursor.execute(
  1025. """
  1026. SELECT COUNT(*) AS cnt
  1027. FROM information_schema.COLUMNS
  1028. WHERE TABLE_SCHEMA = DATABASE()
  1029. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1030. AND COLUMN_NAME = 'weight'
  1031. """,
  1032. )
  1033. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1034. cursor.execute(
  1035. """
  1036. ALTER TABLE hot_content_odps_sync_log
  1037. ADD COLUMN weight DOUBLE NULL DEFAULT NULL
  1038. COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
  1039. AFTER record_id
  1040. """
  1041. )
  1042. def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
  1043. word = str(name or "").strip()
  1044. if not word:
  1045. return []
  1046. self._ensure_wxindex_words_table()
  1047. sql = """
  1048. SELECT dt, total_score
  1049. FROM hot_content_wxindex_words
  1050. WHERE name = %s
  1051. ORDER BY dt ASC
  1052. """
  1053. with self.conn.cursor() as cursor:
  1054. cursor.execute(sql, (word,))
  1055. rows = cursor.fetchall()
  1056. scores: list[dict[str, Any]] = []
  1057. for row in rows:
  1058. dt = str(row.get("dt") or "").strip()
  1059. if not dt:
  1060. continue
  1061. try:
  1062. total_score = float(row["total_score"])
  1063. except (TypeError, ValueError, KeyError):
  1064. continue
  1065. scores.append({"ymd": dt, "total_score": total_score})
  1066. return scores
  1067. def list_stale_wxindex_words(
  1068. self,
  1069. *,
  1070. end_ymd: str,
  1071. update_window_days: int = 7,
  1072. today: date | None = None,
  1073. ) -> list[dict[str, Any]]:
  1074. """返回更新窗口内、仍缺近 7 日区间数据的词。"""
  1075. target_end = str(end_ymd or "").strip()
  1076. if not target_end:
  1077. return []
  1078. current = today or datetime.now(SHANGHAI_TZ).date()
  1079. active_since = current - timedelta(days=max(update_window_days, 0))
  1080. self._ensure_wxindex_word_meta_table()
  1081. self._ensure_wxindex_words_table()
  1082. sql = """
  1083. SELECT
  1084. m.name,
  1085. m.event_created_at,
  1086. m.fetch_start_ymd,
  1087. MIN(w.dt) AS earliest_dt,
  1088. MAX(w.dt) AS latest_dt
  1089. FROM hot_content_wxindex_word_meta m
  1090. INNER JOIN hot_content_wxindex_words w ON w.name = m.name
  1091. WHERE DATE(m.event_created_at) >= %s
  1092. GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
  1093. HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
  1094. ORDER BY m.name ASC
  1095. """
  1096. with self.conn.cursor() as cursor:
  1097. cursor.execute(sql, (active_since, target_end))
  1098. rows = cursor.fetchall()
  1099. stale_words: list[dict[str, Any]] = []
  1100. for row in rows:
  1101. name = str(row.get("name") or "").strip()
  1102. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1103. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1104. latest_dt = str(row.get("latest_dt") or "").strip()
  1105. event_created_at = row.get("event_created_at")
  1106. if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
  1107. stale_words.append(
  1108. {
  1109. "name": name,
  1110. "event_created_at": event_created_at,
  1111. "fetch_start_ymd": fetch_start_ymd,
  1112. "earliest_dt": earliest_dt,
  1113. "latest_dt": latest_dt,
  1114. }
  1115. )
  1116. return stale_words
  1117. def list_word_earliest_event_times(
  1118. self,
  1119. *,
  1120. since_dt: datetime,
  1121. ) -> dict[str, datetime]:
  1122. """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
  1123. self._ensure_record_quality_columns()
  1124. sql = """
  1125. SELECT
  1126. word_name,
  1127. MIN(event_created_at) AS event_created_at
  1128. FROM (
  1129. SELECT
  1130. TRIM(searches.keyword) AS word_name,
  1131. r.created_at AS event_created_at
  1132. FROM hot_content_records r
  1133. JOIN JSON_TABLE(
  1134. r.wxindex_trend_json,
  1135. '$.wxindex_searches[*]' COLUMNS (
  1136. keyword VARCHAR(256) PATH '$.keyword'
  1137. )
  1138. ) AS searches
  1139. WHERE r.created_at >= %s
  1140. AND r.wxindex_trend_json IS NOT NULL
  1141. AND TRIM(searches.keyword) <> ''
  1142. UNION ALL
  1143. SELECT
  1144. TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
  1145. r.created_at AS event_created_at
  1146. FROM hot_content_records r
  1147. WHERE r.created_at >= %s
  1148. AND r.wxindex_trend_json IS NOT NULL
  1149. AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
  1150. ) AS word_events
  1151. WHERE word_name IS NOT NULL
  1152. AND word_name <> ''
  1153. GROUP BY word_name
  1154. """
  1155. with self.conn.cursor() as cursor:
  1156. cursor.execute(sql, (since_dt, since_dt))
  1157. rows = cursor.fetchall()
  1158. event_map: dict[str, datetime] = {}
  1159. for row in rows:
  1160. name = str(row.get("word_name") or "").strip()
  1161. event_created_at = row.get("event_created_at")
  1162. if name and isinstance(event_created_at, datetime):
  1163. event_map[name] = event_created_at
  1164. return event_map
  1165. def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
  1166. self._ensure_wxindex_word_meta_table()
  1167. self._ensure_wxindex_words_table()
  1168. sql = """
  1169. SELECT
  1170. w.name,
  1171. MIN(w.dt) AS earliest_dt,
  1172. MIN(w.created_at) AS first_created_at
  1173. FROM hot_content_wxindex_words w
  1174. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1175. WHERE m.name IS NULL
  1176. GROUP BY w.name
  1177. ORDER BY w.name ASC
  1178. """
  1179. with self.conn.cursor() as cursor:
  1180. cursor.execute(sql)
  1181. rows = cursor.fetchall()
  1182. bounds: list[dict[str, Any]] = []
  1183. for row in rows:
  1184. name = str(row.get("name") or "").strip()
  1185. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1186. first_created_at = row.get("first_created_at")
  1187. if name and earliest_dt:
  1188. bounds.append(
  1189. {
  1190. "name": name,
  1191. "earliest_dt": earliest_dt,
  1192. "first_created_at": first_created_at,
  1193. }
  1194. )
  1195. return bounds
  1196. def list_wxindex_word_names_without_meta(self) -> list[str]:
  1197. self._ensure_wxindex_word_meta_table()
  1198. self._ensure_wxindex_words_table()
  1199. sql = """
  1200. SELECT DISTINCT w.name
  1201. FROM hot_content_wxindex_words w
  1202. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1203. WHERE m.name IS NULL
  1204. ORDER BY w.name ASC
  1205. """
  1206. with self.conn.cursor() as cursor:
  1207. cursor.execute(sql)
  1208. rows = cursor.fetchall()
  1209. return [
  1210. str(row.get("name") or "").strip()
  1211. for row in rows
  1212. if str(row.get("name") or "").strip()
  1213. ]
  1214. def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
  1215. word = str(name or "").strip()
  1216. if not word:
  1217. return None
  1218. self._ensure_wxindex_words_table()
  1219. sql = """
  1220. SELECT MIN(created_at) AS first_created_at
  1221. FROM hot_content_wxindex_words
  1222. WHERE name = %s
  1223. """
  1224. with self.conn.cursor() as cursor:
  1225. cursor.execute(sql, (word,))
  1226. row = cursor.fetchone() or {}
  1227. first_created_at = row.get("first_created_at")
  1228. return first_created_at if isinstance(first_created_at, datetime) else None
  1229. def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
  1230. self._ensure_wxindex_word_meta_table()
  1231. sql = """
  1232. SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
  1233. FROM hot_content_wxindex_word_meta
  1234. ORDER BY id ASC
  1235. """
  1236. with self.conn.cursor() as cursor:
  1237. cursor.execute(sql)
  1238. rows = cursor.fetchall()
  1239. result: list[dict[str, Any]] = []
  1240. for row in rows:
  1241. meta = self._normalize_wxindex_word_meta_row(row)
  1242. if meta is not None:
  1243. result.append(meta)
  1244. return result
  1245. def update_wxindex_word_meta_fetch_start(
  1246. self,
  1247. *,
  1248. name: str,
  1249. fetch_start_ymd: str,
  1250. ) -> None:
  1251. word = str(name or "").strip()
  1252. target_start = str(fetch_start_ymd or "").strip()
  1253. if not word or not target_start:
  1254. raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
  1255. self._ensure_wxindex_word_meta_table()
  1256. sql = """
  1257. UPDATE hot_content_wxindex_word_meta
  1258. SET fetch_start_ymd = %s
  1259. WHERE name = %s
  1260. """
  1261. with self.conn.cursor() as cursor:
  1262. cursor.execute(sql, (target_start, word))
  1263. def update_wxindex_word_meta(
  1264. self,
  1265. *,
  1266. name: str,
  1267. event_created_at: datetime,
  1268. fetch_start_ymd: str,
  1269. fetch_end_ymd: str,
  1270. ) -> None:
  1271. word = str(name or "").strip()
  1272. target_start = str(fetch_start_ymd or "").strip()
  1273. target_end = str(fetch_end_ymd or "").strip()
  1274. if not word or not target_start or not target_end:
  1275. raise HotContentFlowError("invalid wxindex word meta payload")
  1276. self._ensure_wxindex_word_meta_table()
  1277. event_at = event_created_at
  1278. if event_at.tzinfo is not None:
  1279. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  1280. sql = """
  1281. UPDATE hot_content_wxindex_word_meta
  1282. SET event_created_at = %s,
  1283. fetch_start_ymd = %s,
  1284. fetch_end_ymd = %s
  1285. WHERE name = %s
  1286. """
  1287. with self.conn.cursor() as cursor:
  1288. cursor.execute(sql, (event_at, target_start, target_end, word))
  1289. def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
  1290. word = str(name or "").strip()
  1291. if not word:
  1292. return None
  1293. self._ensure_wxindex_word_meta_table()
  1294. sql = """
  1295. SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
  1296. FROM hot_content_wxindex_word_meta
  1297. WHERE name = %s
  1298. """
  1299. with self.conn.cursor() as cursor:
  1300. cursor.execute(sql, (word,))
  1301. row = cursor.fetchone()
  1302. if not row:
  1303. return None
  1304. return self._normalize_wxindex_word_meta_row(row)
  1305. def ensure_wxindex_word_meta(
  1306. self,
  1307. *,
  1308. name: str,
  1309. event_created_at: datetime,
  1310. fetch_start_ymd: str,
  1311. fetch_end_ymd: str,
  1312. ) -> dict[str, Any]:
  1313. word = str(name or "").strip()
  1314. target_start = str(fetch_start_ymd or "").strip()
  1315. target_end = str(fetch_end_ymd or "").strip()
  1316. if not word or not target_start or not target_end:
  1317. raise HotContentFlowError("invalid wxindex word meta payload")
  1318. self._ensure_wxindex_word_meta_table()
  1319. event_at = event_created_at
  1320. if event_at.tzinfo is not None:
  1321. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  1322. sql = """
  1323. INSERT INTO hot_content_wxindex_word_meta (
  1324. name,
  1325. event_created_at,
  1326. fetch_start_ymd,
  1327. fetch_end_ymd
  1328. )
  1329. VALUES (%s, %s, %s, %s)
  1330. ON DUPLICATE KEY UPDATE
  1331. event_created_at = VALUES(event_created_at),
  1332. fetch_start_ymd = VALUES(fetch_start_ymd),
  1333. fetch_end_ymd = VALUES(fetch_end_ymd)
  1334. """
  1335. with self.conn.cursor() as cursor:
  1336. cursor.execute(sql, (word, event_at, target_start, target_end))
  1337. meta = self.get_wxindex_word_meta(word)
  1338. if meta is None:
  1339. raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
  1340. return meta
  1341. def delete_wxindex_word_meta_by_names(self, names: list[str]) -> int:
  1342. words = [str(name or "").strip() for name in names]
  1343. words = [name for name in words if name]
  1344. if not words:
  1345. return 0
  1346. self._ensure_wxindex_word_meta_table()
  1347. placeholders = ", ".join(["%s"] * len(words))
  1348. sql = f"DELETE FROM hot_content_wxindex_word_meta WHERE name IN ({placeholders})"
  1349. with self.conn.cursor() as cursor:
  1350. cursor.execute(sql, tuple(words))
  1351. return int(cursor.rowcount or 0)
  1352. def list_low_max_wxindex_words(
  1353. self,
  1354. *,
  1355. min_max_score: float,
  1356. ) -> list[dict[str, Any]]:
  1357. """按 name 聚合,返回最大值低于阈值的词。"""
  1358. self._ensure_wxindex_words_table()
  1359. sql = """
  1360. SELECT
  1361. name,
  1362. MAX(total_score) AS max_score,
  1363. COUNT(*) AS row_count
  1364. FROM hot_content_wxindex_words
  1365. GROUP BY name
  1366. HAVING MAX(total_score) < %s
  1367. ORDER BY name ASC
  1368. """
  1369. with self.conn.cursor() as cursor:
  1370. cursor.execute(sql, (min_max_score,))
  1371. rows = cursor.fetchall()
  1372. low_words: list[dict[str, Any]] = []
  1373. for row in rows:
  1374. name = str(row.get("name") or "").strip()
  1375. if not name:
  1376. continue
  1377. try:
  1378. max_score = float(row["max_score"])
  1379. row_count = int(row["row_count"])
  1380. except (TypeError, ValueError, KeyError):
  1381. continue
  1382. low_words.append(
  1383. {
  1384. "name": name,
  1385. "max_score": max_score,
  1386. "row_count": row_count,
  1387. }
  1388. )
  1389. return low_words
  1390. def count_wxindex_words_outside_event_window(
  1391. self,
  1392. *,
  1393. window_days: int = 7,
  1394. ) -> int:
  1395. self._ensure_wxindex_word_meta_table()
  1396. self._ensure_wxindex_words_table()
  1397. sql = """
  1398. SELECT COUNT(*) AS row_count
  1399. FROM hot_content_wxindex_words w
  1400. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1401. WHERE w.dt < m.fetch_start_ymd
  1402. OR w.dt > m.fetch_end_ymd
  1403. """
  1404. with self.conn.cursor() as cursor:
  1405. cursor.execute(sql)
  1406. row = cursor.fetchone() or {}
  1407. return int(row.get("row_count") or 0)
  1408. def list_wxindex_words_outside_event_window_samples(
  1409. self,
  1410. *,
  1411. window_days: int = 7,
  1412. limit: int = 20,
  1413. ) -> list[dict[str, Any]]:
  1414. self._ensure_wxindex_word_meta_table()
  1415. self._ensure_wxindex_words_table()
  1416. sql = """
  1417. SELECT
  1418. w.name,
  1419. w.dt,
  1420. m.event_created_at,
  1421. m.fetch_start_ymd AS start_ymd,
  1422. m.fetch_end_ymd AS end_ymd
  1423. FROM hot_content_wxindex_words w
  1424. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1425. WHERE w.dt < m.fetch_start_ymd
  1426. OR w.dt > m.fetch_end_ymd
  1427. ORDER BY w.name ASC, w.dt ASC
  1428. LIMIT %s
  1429. """
  1430. with self.conn.cursor() as cursor:
  1431. cursor.execute(sql, (limit,))
  1432. rows = cursor.fetchall()
  1433. samples: list[dict[str, Any]] = []
  1434. for row in rows:
  1435. name = str(row.get("name") or "").strip()
  1436. dt = str(row.get("dt") or "").strip()
  1437. if name and dt:
  1438. samples.append(
  1439. {
  1440. "name": name,
  1441. "dt": dt,
  1442. "event_created_at": row.get("event_created_at"),
  1443. "start_ymd": str(row.get("start_ymd") or "").strip(),
  1444. "end_ymd": str(row.get("end_ymd") or "").strip(),
  1445. }
  1446. )
  1447. return samples
  1448. def delete_wxindex_words_outside_event_window(
  1449. self,
  1450. *,
  1451. window_days: int = 7,
  1452. ) -> int:
  1453. self._ensure_wxindex_word_meta_table()
  1454. self._ensure_wxindex_words_table()
  1455. sql = """
  1456. DELETE w
  1457. FROM hot_content_wxindex_words w
  1458. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1459. WHERE w.dt < m.fetch_start_ymd
  1460. OR w.dt > m.fetch_end_ymd
  1461. """
  1462. with self.conn.cursor() as cursor:
  1463. cursor.execute(sql)
  1464. return int(cursor.rowcount or 0)
  1465. def delete_wxindex_words_without_meta(self) -> int:
  1466. self._ensure_wxindex_word_meta_table()
  1467. self._ensure_wxindex_words_table()
  1468. sql = """
  1469. DELETE w
  1470. FROM hot_content_wxindex_words w
  1471. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1472. WHERE m.name IS NULL
  1473. """
  1474. with self.conn.cursor() as cursor:
  1475. cursor.execute(sql)
  1476. return int(cursor.rowcount or 0)
  1477. def count_wxindex_words_without_meta(self) -> int:
  1478. self._ensure_wxindex_word_meta_table()
  1479. self._ensure_wxindex_words_table()
  1480. sql = """
  1481. SELECT COUNT(*) AS row_count
  1482. FROM hot_content_wxindex_words w
  1483. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1484. WHERE m.name IS NULL
  1485. """
  1486. with self.conn.cursor() as cursor:
  1487. cursor.execute(sql)
  1488. row = cursor.fetchone() or {}
  1489. return int(row.get("row_count") or 0)
  1490. def delete_wxindex_words_by_names(self, names: list[str]) -> int:
  1491. cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
  1492. if not cleaned:
  1493. return 0
  1494. self._ensure_wxindex_words_table()
  1495. placeholders = ", ".join(["%s"] * len(cleaned))
  1496. sql = f"""
  1497. DELETE FROM hot_content_wxindex_words
  1498. WHERE name IN ({placeholders})
  1499. """
  1500. with self.conn.cursor() as cursor:
  1501. cursor.execute(sql, tuple(cleaned))
  1502. return int(cursor.rowcount or 0)
  1503. def has_wxindex_word(self, name: str) -> bool:
  1504. return self.get_wxindex_word_latest_dt(name) is not None
  1505. def get_wxindex_word_latest_dt(self, name: str) -> str | None:
  1506. word = str(name or "").strip()
  1507. if not word:
  1508. return None
  1509. self._ensure_wxindex_words_table()
  1510. sql = """
  1511. SELECT MAX(dt) AS latest_dt
  1512. FROM hot_content_wxindex_words
  1513. WHERE name = %s
  1514. """
  1515. with self.conn.cursor() as cursor:
  1516. cursor.execute(sql, (word,))
  1517. row = cursor.fetchone() or {}
  1518. latest_dt = str(row.get("latest_dt") or "").strip()
  1519. return latest_dt or None
  1520. def save_wxindex_daily_scores(
  1521. self,
  1522. *,
  1523. name: str,
  1524. scores: list[dict[str, Any]],
  1525. ) -> tuple[int, int]:
  1526. """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
  1527. word = str(name or "").strip()
  1528. if not word or not scores:
  1529. return 0, 0
  1530. self._ensure_wxindex_words_table()
  1531. sql = """
  1532. INSERT IGNORE INTO hot_content_wxindex_words (
  1533. name,
  1534. dt,
  1535. total_score
  1536. )
  1537. VALUES (%s, %s, %s)
  1538. """
  1539. rows: list[tuple[str, str, float]] = []
  1540. seen: set[tuple[str, str]] = set()
  1541. for item in scores:
  1542. if not isinstance(item, dict):
  1543. continue
  1544. dt = str(item.get("ymd") or item.get("dt") or "").strip()
  1545. if not dt:
  1546. continue
  1547. try:
  1548. total_score = float(item["total_score"])
  1549. except (TypeError, ValueError, KeyError):
  1550. continue
  1551. key = (word, dt)
  1552. if key in seen:
  1553. continue
  1554. seen.add(key)
  1555. rows.append((word, dt, total_score))
  1556. if not rows:
  1557. return 0, 0
  1558. inserted = 0
  1559. with self.conn.cursor() as cursor:
  1560. for row in rows:
  1561. cursor.execute(sql, row)
  1562. inserted += int(cursor.rowcount or 0)
  1563. skipped = len(rows) - inserted
  1564. return inserted, skipped
  1565. def list_records_with_wxindex_trend_after(
  1566. self,
  1567. *,
  1568. after_created_at: datetime,
  1569. ) -> list[dict[str, Any]]:
  1570. sql = """
  1571. SELECT id, created_at, wxindex_trend_json
  1572. FROM hot_content_records
  1573. WHERE created_at > %s
  1574. AND wxindex_trend_json IS NOT NULL
  1575. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  1576. ORDER BY id ASC
  1577. """
  1578. with self.conn.cursor() as cursor:
  1579. cursor.execute(sql, (after_created_at,))
  1580. rows = cursor.fetchall()
  1581. records: list[dict[str, Any]] = []
  1582. for row in rows:
  1583. records.append(
  1584. {
  1585. "id": int(row["id"]),
  1586. "created_at": row.get("created_at"),
  1587. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  1588. }
  1589. )
  1590. return records
  1591. def list_records_with_wxindex_trend(
  1592. self,
  1593. *,
  1594. since_dt: datetime,
  1595. ) -> list[dict[str, Any]]:
  1596. sql = """
  1597. SELECT id, created_at, wxindex_trend_json
  1598. FROM hot_content_records
  1599. WHERE created_at >= %s
  1600. AND wxindex_trend_json IS NOT NULL
  1601. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  1602. ORDER BY id ASC
  1603. """
  1604. with self.conn.cursor() as cursor:
  1605. cursor.execute(sql, (since_dt,))
  1606. rows = cursor.fetchall()
  1607. records: list[dict[str, Any]] = []
  1608. for row in rows:
  1609. records.append(
  1610. {
  1611. "id": int(row["id"]),
  1612. "created_at": row.get("created_at"),
  1613. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  1614. }
  1615. )
  1616. return records
  1617. def _ensure_wxindex_word_meta_table(self) -> None:
  1618. sql = """
  1619. CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
  1620. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  1621. name VARCHAR(256) NOT NULL COMMENT '词',
  1622. event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
  1623. fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
  1624. fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天',
  1625. meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
  1626. PRIMARY KEY (id),
  1627. UNIQUE KEY uk_name (name),
  1628. KEY idx_event_created_at (event_created_at)
  1629. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1630. """
  1631. with self.conn.cursor() as cursor:
  1632. cursor.execute(sql)
  1633. self._ensure_wxindex_word_meta_id_column(cursor)
  1634. self._ensure_wxindex_word_meta_fetch_end_column(cursor)
  1635. def _ensure_wxindex_word_meta_fetch_end_column(self, cursor: Any) -> None:
  1636. cursor.execute(
  1637. """
  1638. SELECT COUNT(*) AS cnt
  1639. FROM information_schema.COLUMNS
  1640. WHERE TABLE_SCHEMA = DATABASE()
  1641. AND TABLE_NAME = 'hot_content_wxindex_word_meta'
  1642. AND COLUMN_NAME = 'fetch_end_ymd'
  1643. """
  1644. )
  1645. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1646. cursor.execute(
  1647. """
  1648. ALTER TABLE hot_content_wxindex_word_meta
  1649. ADD COLUMN fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT ''
  1650. COMMENT '数据窗口右边界:事件创建日后7天'
  1651. AFTER fetch_start_ymd
  1652. """
  1653. )
  1654. cursor.execute(
  1655. """
  1656. UPDATE hot_content_wxindex_word_meta
  1657. SET fetch_end_ymd = DATE_FORMAT(
  1658. DATE_ADD(DATE(event_created_at), INTERVAL 7 DAY),
  1659. '%Y%m%d'
  1660. )
  1661. WHERE fetch_end_ymd IS NULL
  1662. OR TRIM(fetch_end_ymd) = ''
  1663. """
  1664. )
  1665. def _ensure_wxindex_word_meta_id_column(self, cursor: Any) -> None:
  1666. cursor.execute(
  1667. """
  1668. SELECT COUNT(*) AS cnt
  1669. FROM information_schema.COLUMNS
  1670. WHERE TABLE_SCHEMA = DATABASE()
  1671. AND TABLE_NAME = 'hot_content_wxindex_word_meta'
  1672. AND COLUMN_NAME = 'id'
  1673. """
  1674. )
  1675. if int((cursor.fetchone() or {}).get("cnt") or 0) > 0:
  1676. return
  1677. cursor.execute(
  1678. """
  1679. ALTER TABLE hot_content_wxindex_word_meta
  1680. DROP PRIMARY KEY,
  1681. ADD COLUMN id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST,
  1682. ADD UNIQUE KEY uk_name (name)
  1683. """
  1684. )
  1685. @staticmethod
  1686. def _normalize_wxindex_word_meta_row(row: dict[str, Any]) -> dict[str, Any] | None:
  1687. name = str(row.get("name") or "").strip()
  1688. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1689. fetch_end_ymd = str(row.get("fetch_end_ymd") or "").strip()
  1690. event_created_at = row.get("event_created_at")
  1691. if not name or not fetch_start_ymd or event_created_at is None:
  1692. return None
  1693. if not fetch_end_ymd and isinstance(event_created_at, datetime):
  1694. event_date = event_created_at.date()
  1695. fetch_end_ymd = (event_date + timedelta(days=7)).strftime("%Y%m%d")
  1696. if not fetch_end_ymd:
  1697. return None
  1698. try:
  1699. meta_id = int(row.get("id"))
  1700. except (TypeError, ValueError):
  1701. return None
  1702. return {
  1703. "id": meta_id,
  1704. "name": name,
  1705. "event_created_at": event_created_at,
  1706. "fetch_start_ymd": fetch_start_ymd,
  1707. "fetch_end_ymd": fetch_end_ymd,
  1708. }
  1709. def _ensure_wxindex_words_table(self) -> None:
  1710. sql = """
  1711. CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
  1712. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1713. name VARCHAR(256) NOT NULL COMMENT '词',
  1714. dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
  1715. total_score DOUBLE NOT NULL COMMENT '微信指数',
  1716. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  1717. PRIMARY KEY (id),
  1718. UNIQUE KEY uk_name_dt (name, dt),
  1719. KEY idx_name (name),
  1720. KEY idx_dt (dt)
  1721. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1722. """
  1723. with self.conn.cursor() as cursor:
  1724. cursor.execute(sql)
  1725. def _ensure_odps_sync_log_table(self) -> None:
  1726. sql = """
  1727. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  1728. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  1729. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  1730. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  1731. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  1732. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  1733. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  1734. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  1735. weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
  1736. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  1737. PRIMARY KEY (id),
  1738. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  1739. KEY idx_record_partition (record_id, partition_dt),
  1740. KEY idx_partition_strategy (partition_dt, strategy)
  1741. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1742. """
  1743. with self.conn.cursor() as cursor:
  1744. cursor.execute(sql)
  1745. self._ensure_odps_sync_log_weight_column(cursor)