repository.py 94 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569
  1. """热点内容 MySQL 仓储。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from typing import Any
  7. try:
  8. import pymysql
  9. from pymysql.cursors import DictCursor
  10. except ImportError: # pragma: no cover - runtime dependency check
  11. pymysql = None
  12. DictCursor = None
  13. from app.hot_content.exceptions import HotContentFlowError
  14. from app.hot_content.status import ExecutionStatus, PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import MysqlConfig
  17. def _json_dumps(data: Any) -> str:
  18. return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
  19. def _nullable_bool(value: Any) -> int | None:
  20. if value is None:
  21. return None
  22. return 1 if value else 0
  23. def _nullable_str(value: Any) -> str | None:
  24. if value is None:
  25. return None
  26. text = str(value).strip()
  27. return text if text else None
  28. def _nullable_json_dumps(value: Any) -> str | None:
  29. if value is None:
  30. return None
  31. return _json_dumps(value)
  32. def _wxindex_word_record_row(payload: dict[str, Any]) -> tuple[Any, ...]:
  33. name = str(payload.get("name") or "").strip()
  34. analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
  35. if not name or not analyze_ymd:
  36. raise HotContentFlowError("invalid wxindex word record payload")
  37. return (
  38. name,
  39. payload.get("meta_id"),
  40. analyze_ymd,
  41. payload.get("fetch_start_ymd"),
  42. payload.get("fetch_end_ymd"),
  43. payload.get("data_start_ymd"),
  44. payload.get("data_end_ymd"),
  45. payload.get("data_days"),
  46. _nullable_bool(payload.get("is_sustained_high")),
  47. _nullable_bool(payload.get("is_rising")),
  48. _nullable_bool(payload.get("is_spike")),
  49. payload.get("retain_reason"),
  50. _nullable_bool(payload.get("is_internal_demand_matched")),
  51. _nullable_str(payload.get("matched_demand")),
  52. payload.get("demand_cache_run_id"),
  53. _nullable_json_dumps(payload.get("internal_demand_match_json")),
  54. payload.get("senior_fit_score"),
  55. _nullable_json_dumps(payload.get("demand_senior_fit_json")),
  56. _nullable_bool(payload.get("is_final_retained")),
  57. payload.get("min_score"),
  58. payload.get("max_score"),
  59. payload.get("avg_score"),
  60. _nullable_json_dumps(payload.get("detail_json")),
  61. )
  62. _WXINDEX_WORD_RECORD_UPSERT_SQL = """
  63. INSERT INTO hot_content_wxindex_word_records (
  64. name,
  65. meta_id,
  66. analyze_ymd,
  67. fetch_start_ymd,
  68. fetch_end_ymd,
  69. data_start_ymd,
  70. data_end_ymd,
  71. data_days,
  72. is_sustained_high,
  73. is_rising,
  74. is_spike,
  75. retain_reason,
  76. is_internal_demand_matched,
  77. matched_demand,
  78. demand_cache_run_id,
  79. internal_demand_match_json,
  80. senior_fit_score,
  81. demand_senior_fit_json,
  82. is_final_retained,
  83. min_score,
  84. max_score,
  85. avg_score,
  86. detail_json
  87. )
  88. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  89. ON DUPLICATE KEY UPDATE
  90. meta_id = VALUES(meta_id),
  91. fetch_start_ymd = VALUES(fetch_start_ymd),
  92. fetch_end_ymd = VALUES(fetch_end_ymd),
  93. data_start_ymd = VALUES(data_start_ymd),
  94. data_end_ymd = VALUES(data_end_ymd),
  95. data_days = VALUES(data_days),
  96. is_sustained_high = VALUES(is_sustained_high),
  97. is_rising = VALUES(is_rising),
  98. is_spike = VALUES(is_spike),
  99. retain_reason = VALUES(retain_reason),
  100. is_internal_demand_matched = VALUES(is_internal_demand_matched),
  101. matched_demand = VALUES(matched_demand),
  102. demand_cache_run_id = VALUES(demand_cache_run_id),
  103. internal_demand_match_json = VALUES(internal_demand_match_json),
  104. senior_fit_score = VALUES(senior_fit_score),
  105. demand_senior_fit_json = VALUES(demand_senior_fit_json),
  106. is_final_retained = VALUES(is_final_retained),
  107. min_score = VALUES(min_score),
  108. max_score = VALUES(max_score),
  109. avg_score = VALUES(avg_score),
  110. detail_json = VALUES(detail_json)
  111. """
  112. _WXINDEX_WORD_RECORD_INIT_UPSERT_SQL = """
  113. INSERT INTO hot_content_wxindex_word_records (
  114. name,
  115. meta_id,
  116. analyze_ymd,
  117. fetch_start_ymd,
  118. fetch_end_ymd,
  119. data_start_ymd,
  120. data_end_ymd,
  121. data_days,
  122. is_sustained_high,
  123. is_rising,
  124. is_spike,
  125. retain_reason,
  126. is_internal_demand_matched,
  127. matched_demand,
  128. demand_cache_run_id,
  129. internal_demand_match_json,
  130. senior_fit_score,
  131. demand_senior_fit_json,
  132. is_final_retained,
  133. min_score,
  134. max_score,
  135. avg_score,
  136. detail_json
  137. )
  138. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  139. ON DUPLICATE KEY UPDATE
  140. meta_id = VALUES(meta_id),
  141. fetch_start_ymd = VALUES(fetch_start_ymd),
  142. fetch_end_ymd = VALUES(fetch_end_ymd),
  143. demand_cache_run_id = VALUES(demand_cache_run_id)
  144. """
  145. def _wxindex_word_stats_row(payload: dict[str, Any]) -> tuple[Any, ...]:
  146. name = str(payload.get("name") or "").strip()
  147. analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
  148. if not name or not analyze_ymd:
  149. raise HotContentFlowError("invalid wxindex word stats payload")
  150. return (
  151. name,
  152. payload.get("meta_id"),
  153. analyze_ymd,
  154. payload.get("wxindex_word_record_id"),
  155. payload.get("retain_reason"),
  156. payload.get("senior_fit_score"),
  157. payload.get("data_start_ymd"),
  158. payload.get("data_end_ymd"),
  159. payload.get("data_days"),
  160. payload.get("min_score"),
  161. payload.get("max_score"),
  162. payload.get("avg_score"),
  163. _nullable_json_dumps(payload.get("detail_json")),
  164. )
  165. _WXINDEX_WORD_STATS_UPSERT_SQL = """
  166. INSERT INTO hot_content_wxindex_word_stats (
  167. name,
  168. meta_id,
  169. analyze_ymd,
  170. wxindex_word_record_id,
  171. retain_reason,
  172. senior_fit_score,
  173. data_start_ymd,
  174. data_end_ymd,
  175. data_days,
  176. min_score,
  177. max_score,
  178. avg_score,
  179. detail_json
  180. )
  181. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  182. ON DUPLICATE KEY UPDATE
  183. meta_id = VALUES(meta_id),
  184. wxindex_word_record_id = VALUES(wxindex_word_record_id),
  185. retain_reason = VALUES(retain_reason),
  186. senior_fit_score = VALUES(senior_fit_score),
  187. data_start_ymd = VALUES(data_start_ymd),
  188. data_end_ymd = VALUES(data_end_ymd),
  189. data_days = VALUES(data_days),
  190. min_score = VALUES(min_score),
  191. max_score = VALUES(max_score),
  192. avg_score = VALUES(avg_score),
  193. detail_json = VALUES(detail_json)
  194. """
  195. def _json_loads(value: Any) -> Any:
  196. if value is None:
  197. return None
  198. if isinstance(value, (dict, list)):
  199. return value
  200. if isinstance(value, (bytes, bytearray)):
  201. value = value.decode("utf-8")
  202. if isinstance(value, str):
  203. return json.loads(value)
  204. return value
  205. def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
  206. names: list[str] = []
  207. seen: set[str] = set()
  208. for item in demand_name_set:
  209. name = str(item).strip()
  210. if not name or name in seen:
  211. continue
  212. seen.add(name)
  213. names.append(name)
  214. return names
  215. def unique_title_key(source: str, title: str) -> str:
  216. return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
  217. class HotContentRepository:
  218. def __init__(self, config: MysqlConfig):
  219. if pymysql is None or DictCursor is None:
  220. raise HotContentFlowError("missing dependency: pip install pymysql")
  221. self.conn = pymysql.connect(
  222. host=config.host,
  223. port=config.port,
  224. user=config.user,
  225. password=config.password,
  226. database=config.database,
  227. charset=config.charset,
  228. autocommit=True,
  229. cursorclass=DictCursor,
  230. )
  231. def close(self) -> None:
  232. self.conn.close()
  233. def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
  234. key = unique_title_key(source, title)
  235. sql = """
  236. INSERT INTO hot_content_records (
  237. unique_key, source, title, hot_rank, execution_status, created_at, updated_at
  238. )
  239. VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
  240. ON DUPLICATE KEY UPDATE
  241. id=LAST_INSERT_ID(id),
  242. hot_rank=VALUES(hot_rank),
  243. updated_at=NOW()
  244. """
  245. with self.conn.cursor() as cursor:
  246. cursor.execute(
  247. sql,
  248. (
  249. key,
  250. source,
  251. title,
  252. rank,
  253. ExecutionStatus.HOT_SAVED,
  254. ),
  255. )
  256. record_id = int(cursor.lastrowid)
  257. cursor.execute(
  258. """
  259. SELECT
  260. id,
  261. unique_key,
  262. execution_status,
  263. article_title,
  264. article_body,
  265. article_url,
  266. decode_request_result IS NOT NULL AS has_decode_request,
  267. contribution_points_json IS NOT NULL AS has_contribution_points
  268. FROM hot_content_records
  269. WHERE id = %s
  270. """,
  271. (record_id,),
  272. )
  273. row = cursor.fetchone()
  274. if not row:
  275. raise HotContentFlowError(f"missing hot_content_records id={record_id}")
  276. return {
  277. "id": int(row["id"]),
  278. "unique_key": str(row["unique_key"]),
  279. "execution_status": int(row["execution_status"]),
  280. "article_title": row.get("article_title"),
  281. "article_body": row.get("article_body"),
  282. "article_url": row.get("article_url"),
  283. "has_decode_request": bool(row.get("has_decode_request")),
  284. "has_contribution_points": bool(row.get("has_contribution_points")),
  285. }
  286. def update_status(
  287. self,
  288. *,
  289. record_id: int,
  290. status: int,
  291. error_message: str | None = None,
  292. ) -> None:
  293. sql = """
  294. UPDATE hot_content_records
  295. SET execution_status=%s, error_reason=%s, updated_at=NOW()
  296. WHERE id=%s
  297. """
  298. with self.conn.cursor() as cursor:
  299. cursor.execute(sql, (status, error_message, record_id))
  300. def update_article(
  301. self,
  302. *,
  303. record_id: int,
  304. article_title: str,
  305. article_body: str,
  306. url: str,
  307. ) -> None:
  308. sql = """
  309. UPDATE hot_content_records
  310. SET article_title=%s,
  311. article_body=%s,
  312. article_url=%s,
  313. execution_status=%s,
  314. error_reason=NULL,
  315. updated_at=NOW()
  316. WHERE id=%s
  317. """
  318. with self.conn.cursor() as cursor:
  319. cursor.execute(
  320. sql,
  321. (
  322. article_title,
  323. article_body,
  324. url,
  325. ExecutionStatus.CONTENT_OK,
  326. record_id,
  327. ),
  328. )
  329. def mark_no_valid_content(
  330. self,
  331. *,
  332. record_id: int,
  333. reason: str,
  334. ) -> None:
  335. """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
  336. sql = """
  337. UPDATE hot_content_records
  338. SET execution_status=%s,
  339. error_reason=%s,
  340. updated_at=NOW()
  341. WHERE id=%s
  342. """
  343. with self.conn.cursor() as cursor:
  344. cursor.execute(
  345. sql,
  346. (
  347. ExecutionStatus.NO_VALID_CONTENT,
  348. str(reason or "no valid content").strip(),
  349. record_id,
  350. ),
  351. )
  352. def update_category_filter_result(
  353. self,
  354. *,
  355. record_id: int,
  356. passed: bool,
  357. result_json: dict[str, Any],
  358. ) -> None:
  359. self._ensure_category_filter_columns()
  360. status = (
  361. ExecutionStatus.CATEGORY_FILTER_PASSED
  362. if passed
  363. else ExecutionStatus.CATEGORY_FILTER_REJECTED
  364. )
  365. reason = str(result_json.get("reason") or "").strip()
  366. error_message = None if passed else (reason or "category filter rejected")
  367. sql = """
  368. UPDATE hot_content_records
  369. SET execution_status=%s,
  370. category_filter_passed=%s,
  371. category_filter_reason=%s,
  372. category_filter_json=%s,
  373. error_reason=%s,
  374. updated_at=NOW()
  375. WHERE id=%s
  376. """
  377. with self.conn.cursor() as cursor:
  378. cursor.execute(
  379. sql,
  380. (
  381. status,
  382. 1 if passed else 0,
  383. reason or None,
  384. _json_dumps(result_json),
  385. error_message,
  386. record_id,
  387. ),
  388. )
  389. def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
  390. self._ensure_category_filter_columns()
  391. sql = """
  392. SELECT
  393. id,
  394. source,
  395. title,
  396. article_title,
  397. article_body,
  398. article_url,
  399. execution_status,
  400. category_filter_passed,
  401. category_filter_reason,
  402. category_filter_json
  403. FROM hot_content_records
  404. WHERE id = %s
  405. LIMIT 1
  406. """
  407. with self.conn.cursor() as cursor:
  408. cursor.execute(sql, (record_id,))
  409. row = cursor.fetchone()
  410. if not row:
  411. return None
  412. category_filter_json = _json_loads(row.get("category_filter_json"))
  413. passed_raw = row.get("category_filter_passed")
  414. passed: bool | None
  415. if passed_raw is None:
  416. passed = None
  417. else:
  418. passed = bool(int(passed_raw))
  419. return {
  420. "id": int(row["id"]),
  421. "source": str(row.get("source") or ""),
  422. "title": str(row.get("title") or ""),
  423. "article_title": str(row.get("article_title") or ""),
  424. "article_body": str(row.get("article_body") or ""),
  425. "article_url": str(row.get("article_url") or ""),
  426. "execution_status": int(row.get("execution_status") or 0),
  427. "category_filter_passed": passed,
  428. "category_filter_reason": str(row.get("category_filter_reason") or ""),
  429. "category_filter_json": (
  430. category_filter_json if isinstance(category_filter_json, dict) else {}
  431. ),
  432. }
  433. def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
  434. record = self.get_record_for_category_filter(record_id)
  435. if not record:
  436. return None
  437. matched_category = None
  438. category_filter_json = record.get("category_filter_json") or {}
  439. if isinstance(category_filter_json, dict):
  440. matched_category = category_filter_json.get("matched_category")
  441. return {
  442. "id": record["id"],
  443. "passed": record["category_filter_passed"],
  444. "reason": record["category_filter_reason"],
  445. "matched_category": matched_category,
  446. "execution_status": record["execution_status"],
  447. }
  448. def update_decode_result(
  449. self,
  450. *,
  451. record_id: int,
  452. status: int,
  453. request_json: dict[str, Any],
  454. response_json: dict[str, Any] | None,
  455. error_message: str | None = None,
  456. ) -> None:
  457. decode_request_result = {
  458. "request": request_json,
  459. "response": response_json,
  460. }
  461. sql = """
  462. UPDATE hot_content_records
  463. SET decode_request_result=%s,
  464. execution_status=%s,
  465. error_reason=%s,
  466. updated_at=NOW()
  467. WHERE id=%s
  468. """
  469. with self.conn.cursor() as cursor:
  470. cursor.execute(
  471. sql,
  472. (
  473. _json_dumps(decode_request_result),
  474. status,
  475. error_message,
  476. record_id,
  477. ),
  478. )
  479. def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  480. sql = """
  481. SELECT id, unique_key
  482. FROM hot_content_records
  483. WHERE execution_status IN (%s, %s, %s)
  484. AND contribution_points_json IS NULL
  485. ORDER BY updated_at ASC, id ASC
  486. LIMIT %s
  487. """
  488. with self.conn.cursor() as cursor:
  489. cursor.execute(
  490. sql,
  491. (
  492. ExecutionStatus.DECODE_SUBMITTED,
  493. ExecutionStatus.DECODE_SUCCESS,
  494. ExecutionStatus.DECODE_PENDING,
  495. limit,
  496. ),
  497. )
  498. rows = cursor.fetchall()
  499. return [
  500. {
  501. "id": int(row["id"]),
  502. "unique_key": str(row["unique_key"]),
  503. }
  504. for row in rows
  505. ]
  506. def save_decode_result_export(
  507. self,
  508. *,
  509. record_id: int,
  510. decode_result_json: dict[str, Any],
  511. contribution_points_json: dict[str, Any],
  512. ) -> None:
  513. sql = """
  514. UPDATE hot_content_records
  515. SET decode_result_json=%s,
  516. contribution_points_json=%s,
  517. execution_status=%s,
  518. error_reason=NULL,
  519. updated_at=NOW()
  520. WHERE id=%s
  521. """
  522. with self.conn.cursor() as cursor:
  523. cursor.execute(
  524. sql,
  525. (
  526. _json_dumps(decode_result_json),
  527. _json_dumps(contribution_points_json),
  528. ExecutionStatus.CONTRIBUTION_EXTRACTED,
  529. record_id,
  530. ),
  531. )
  532. def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
  533. sql = """
  534. SELECT
  535. id,
  536. cache_hour,
  537. source_table,
  538. partition_dt,
  539. demand_name_set_json,
  540. item_count,
  541. updated_at
  542. FROM demand_pool_hourly_cache
  543. WHERE cache_hour=%s
  544. LIMIT 1
  545. """
  546. with self.conn.cursor() as cursor:
  547. cursor.execute(sql, (cache_hour,))
  548. row = cursor.fetchone()
  549. if not row:
  550. return None
  551. demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
  552. if not isinstance(demand_name_set, list):
  553. demand_name_set = []
  554. return {
  555. "id": int(row["id"]),
  556. "cache_hour": row.get("cache_hour"),
  557. "source_table": str(row["source_table"]),
  558. "partition_dt": row.get("partition_dt"),
  559. "demand_name_set": [
  560. str(name).strip()
  561. for name in demand_name_set
  562. if str(name).strip()
  563. ],
  564. "item_count": int(row.get("item_count") or 0),
  565. "updated_at": row.get("updated_at"),
  566. }
  567. def save_demand_cache_set(
  568. self,
  569. *,
  570. cache_hour: datetime,
  571. source_table: str,
  572. partition_dt: str | None,
  573. excluded_strategy: str,
  574. top_n: int,
  575. demand_name_set: list[str],
  576. ) -> int:
  577. sql = """
  578. INSERT INTO demand_pool_hourly_cache (
  579. cache_hour,
  580. source_table,
  581. partition_dt,
  582. excluded_strategy,
  583. top_n,
  584. demand_name_set_json,
  585. item_count,
  586. created_at,
  587. updated_at
  588. )
  589. VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  590. ON DUPLICATE KEY UPDATE
  591. id=LAST_INSERT_ID(id),
  592. source_table=VALUES(source_table),
  593. partition_dt=VALUES(partition_dt),
  594. excluded_strategy=VALUES(excluded_strategy),
  595. top_n=VALUES(top_n),
  596. demand_name_set_json=VALUES(demand_name_set_json),
  597. item_count=VALUES(item_count),
  598. updated_at=NOW()
  599. """
  600. normalized_names = _normalize_demand_names(demand_name_set)
  601. with self.conn.cursor() as cursor:
  602. cursor.execute(
  603. sql,
  604. (
  605. cache_hour,
  606. source_table,
  607. partition_dt,
  608. excluded_strategy,
  609. top_n,
  610. _json_dumps(normalized_names),
  611. len(normalized_names),
  612. ),
  613. )
  614. return int(cursor.lastrowid)
  615. def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
  616. sql = """
  617. SELECT
  618. id,
  619. unique_key,
  620. source,
  621. title,
  622. created_at,
  623. article_title,
  624. article_body,
  625. demand_cache_run_id,
  626. postprocess_status,
  627. decode_result_json,
  628. contribution_points_json,
  629. contribution_demand_match_json,
  630. wxindex_trend_json,
  631. demand_event_sense_json,
  632. demand_senior_fit_json
  633. FROM hot_content_records
  634. WHERE contribution_points_json IS NOT NULL
  635. AND postprocess_status IN (%s, %s, %s, %s)
  636. ORDER BY updated_at ASC, id ASC
  637. LIMIT %s
  638. """
  639. with self.conn.cursor() as cursor:
  640. cursor.execute(
  641. sql,
  642. (
  643. PostprocessStatus.PENDING,
  644. PostprocessStatus.DEMAND_MATCHED,
  645. PostprocessStatus.WXINDEX_DONE,
  646. PostprocessStatus.FAILED,
  647. limit,
  648. ),
  649. )
  650. rows = cursor.fetchall()
  651. return [
  652. {
  653. "id": int(row["id"]),
  654. "unique_key": str(row["unique_key"]),
  655. "source": str(row.get("source") or ""),
  656. "title": str(row.get("title") or ""),
  657. "created_at": row.get("created_at"),
  658. "article_title": row.get("article_title"),
  659. "article_body": row.get("article_body"),
  660. "demand_cache_run_id": row.get("demand_cache_run_id"),
  661. "postprocess_status": int(row.get("postprocess_status") or 0),
  662. "decode_result_json": _json_loads(row.get("decode_result_json")),
  663. "contribution_points_json": _json_loads(row.get("contribution_points_json")),
  664. "contribution_demand_match_json": _json_loads(
  665. row.get("contribution_demand_match_json")
  666. ),
  667. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  668. "demand_event_sense_json": _json_loads(
  669. row.get("demand_event_sense_json")
  670. ),
  671. "demand_senior_fit_json": _json_loads(row.get("demand_senior_fit_json")),
  672. }
  673. for row in rows
  674. ]
  675. def save_contribution_demand_match(
  676. self,
  677. *,
  678. record_id: int,
  679. demand_cache_run_id: int,
  680. match_json: dict[str, Any],
  681. ) -> None:
  682. sql = """
  683. UPDATE hot_content_records
  684. SET demand_cache_run_id=%s,
  685. contribution_demand_match_json=%s,
  686. postprocess_status=%s,
  687. postprocess_error_reason=NULL,
  688. updated_at=NOW()
  689. WHERE id=%s
  690. """
  691. with self.conn.cursor() as cursor:
  692. cursor.execute(
  693. sql,
  694. (
  695. demand_cache_run_id,
  696. _json_dumps(match_json),
  697. PostprocessStatus.DEMAND_MATCHED,
  698. record_id,
  699. ),
  700. )
  701. def save_wxindex_trend(
  702. self,
  703. *,
  704. record_id: int,
  705. trend_json: dict[str, Any],
  706. ) -> None:
  707. sql = """
  708. UPDATE hot_content_records
  709. SET wxindex_trend_json=%s,
  710. postprocess_status=%s,
  711. postprocess_error_reason=NULL,
  712. updated_at=NOW()
  713. WHERE id=%s
  714. """
  715. with self.conn.cursor() as cursor:
  716. cursor.execute(
  717. sql,
  718. (
  719. _json_dumps(trend_json),
  720. PostprocessStatus.WXINDEX_DONE,
  721. record_id,
  722. ),
  723. )
  724. def save_demand_quality(
  725. self,
  726. *,
  727. record_id: int,
  728. event_sense_json: dict[str, Any],
  729. senior_fit_json: dict[str, Any],
  730. update_status: bool = True,
  731. ) -> None:
  732. self._ensure_record_quality_columns()
  733. if update_status:
  734. sql = """
  735. UPDATE hot_content_records
  736. SET demand_event_sense_json=%s,
  737. demand_senior_fit_json=%s,
  738. postprocess_status=%s,
  739. postprocess_error_reason=NULL,
  740. updated_at=NOW()
  741. WHERE id=%s
  742. """
  743. params = (
  744. _json_dumps(event_sense_json),
  745. _json_dumps(senior_fit_json),
  746. PostprocessStatus.QUALITY_DONE,
  747. record_id,
  748. )
  749. else:
  750. sql = """
  751. UPDATE hot_content_records
  752. SET demand_event_sense_json=%s,
  753. demand_senior_fit_json=%s,
  754. updated_at=NOW()
  755. WHERE id=%s
  756. """
  757. params = (
  758. _json_dumps(event_sense_json),
  759. _json_dumps(senior_fit_json),
  760. record_id,
  761. )
  762. with self.conn.cursor() as cursor:
  763. cursor.execute(sql, params)
  764. def update_postprocess_status(
  765. self,
  766. *,
  767. record_id: int,
  768. status: int,
  769. error_message: str | None = None,
  770. ) -> None:
  771. sql = """
  772. UPDATE hot_content_records
  773. SET postprocess_status=%s,
  774. postprocess_error_reason=%s,
  775. updated_at=NOW()
  776. WHERE id=%s
  777. """
  778. with self.conn.cursor() as cursor:
  779. cursor.execute(sql, (status, error_message, record_id))
  780. def replace_demand_export_rows(
  781. self,
  782. *,
  783. record_id: int,
  784. source: str,
  785. hot_title: str,
  786. article_title: str,
  787. rows: list[dict[str, Any]],
  788. ) -> None:
  789. self._ensure_demand_export_table()
  790. delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
  791. insert_sql = """
  792. INSERT INTO hot_content_demand_exports (
  793. record_id,
  794. source,
  795. hot_title,
  796. article_title,
  797. item_type,
  798. item_text,
  799. point_category,
  800. matched_demand,
  801. contribution_score,
  802. wxindex_keyword,
  803. all_hot_keywords,
  804. wxindex_latest_score,
  805. wxindex_trend,
  806. is_as_demand,
  807. event_sense_score,
  808. senior_fit_score,
  809. created_at,
  810. updated_at
  811. )
  812. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  813. """
  814. with self.conn.cursor() as cursor:
  815. cursor.execute(delete_sql, (record_id,))
  816. insert_rows = [
  817. (
  818. record_id,
  819. source,
  820. hot_title,
  821. article_title,
  822. str(item.get("item_type") or ""),
  823. str(item.get("item_text") or ""),
  824. str(item.get("point_category") or ""),
  825. str(item.get("matched_demand") or ""),
  826. item.get("contribution_score"),
  827. str(item.get("wxindex_keyword") or ""),
  828. str(item.get("all_hot_keywords") or ""),
  829. float(item.get("wxindex_latest_score") or 0),
  830. str(item.get("wxindex_trend") or ""),
  831. int(item.get("is_as_demand") or 0),
  832. item.get("event_sense_score"),
  833. item.get("senior_fit_score"),
  834. )
  835. for item in rows
  836. if str(item.get("item_type") or "").strip()
  837. and str(item.get("item_text") or "").strip()
  838. ]
  839. if insert_rows:
  840. cursor.executemany(insert_sql, insert_rows)
  841. def list_odps_sync_records(self, *, partition_dt: str | None = None) -> list[dict[str, Any]]:
  842. """读取指定创建日已完成质量判断的记录,供 ODPS 同步(按 created_at,不按 updated_at)。"""
  843. self._ensure_record_quality_columns()
  844. if partition_dt:
  845. created_day_start = datetime.strptime(partition_dt, "%Y%m%d")
  846. else:
  847. created_day_start = datetime.now(SHANGHAI_TZ).replace(
  848. hour=0,
  849. minute=0,
  850. second=0,
  851. microsecond=0,
  852. tzinfo=None,
  853. )
  854. created_day_end = created_day_start + timedelta(days=1)
  855. sql = """
  856. SELECT
  857. id,
  858. contribution_points_json,
  859. contribution_demand_match_json,
  860. wxindex_trend_json,
  861. demand_event_sense_json,
  862. demand_senior_fit_json
  863. FROM hot_content_records
  864. WHERE created_at >= %s
  865. AND created_at < %s
  866. AND postprocess_status = %s
  867. AND contribution_demand_match_json IS NOT NULL
  868. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  869. ORDER BY id ASC
  870. """
  871. with self.conn.cursor() as cursor:
  872. cursor.execute(
  873. sql,
  874. (created_day_start, created_day_end, PostprocessStatus.QUALITY_DONE),
  875. )
  876. rows = cursor.fetchall()
  877. records: list[dict[str, Any]] = []
  878. for row in rows:
  879. records.append(
  880. {
  881. "id": int(row["id"]),
  882. "contribution_points_json": _json_loads(
  883. row.get("contribution_points_json")
  884. ),
  885. "contribution_demand_match_json": _json_loads(
  886. row.get("contribution_demand_match_json")
  887. ),
  888. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  889. "demand_event_sense_json": _json_loads(
  890. row.get("demand_event_sense_json")
  891. ),
  892. "demand_senior_fit_json": _json_loads(
  893. row.get("demand_senior_fit_json")
  894. ),
  895. }
  896. )
  897. return records
  898. def list_demand_export_groups(self) -> list[dict[str, Any]]:
  899. """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
  900. self._ensure_demand_export_table()
  901. today_start = datetime.now(SHANGHAI_TZ).replace(
  902. hour=0,
  903. minute=0,
  904. second=0,
  905. microsecond=0,
  906. tzinfo=None,
  907. )
  908. today_end = today_start + timedelta(days=1)
  909. sql = """
  910. SELECT
  911. e.record_id,
  912. e.item_type,
  913. e.item_text,
  914. e.point_category,
  915. e.matched_demand,
  916. e.wxindex_latest_score
  917. FROM hot_content_demand_exports e
  918. INNER JOIN hot_content_records r ON r.id = e.record_id
  919. WHERE r.created_at >= %s
  920. AND r.created_at < %s
  921. ORDER BY e.record_id ASC, e.id ASC
  922. """
  923. with self.conn.cursor() as cursor:
  924. cursor.execute(sql, (today_start, today_end))
  925. rows = cursor.fetchall()
  926. grouped: dict[int, list[dict[str, Any]]] = {}
  927. for row in rows:
  928. record_id = int(row["record_id"])
  929. grouped.setdefault(record_id, []).append(
  930. {
  931. "item_type": str(row.get("item_type") or ""),
  932. "item_text": str(row.get("item_text") or ""),
  933. "point_category": str(row.get("point_category") or ""),
  934. "matched_demand": str(row.get("matched_demand") or ""),
  935. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  936. }
  937. )
  938. return [
  939. {"record_id": record_id, "export_rows": export_rows}
  940. for record_id, export_rows in grouped.items()
  941. ]
  942. def _ensure_demand_export_table(self) -> None:
  943. sql = """
  944. CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
  945. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  946. record_id BIGINT UNSIGNED NOT NULL,
  947. source VARCHAR(64) NOT NULL DEFAULT '',
  948. hot_title VARCHAR(1024) NOT NULL DEFAULT '',
  949. article_title VARCHAR(1024) NOT NULL DEFAULT '',
  950. item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
  951. item_text VARCHAR(1024) NOT NULL,
  952. point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
  953. matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
  954. contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
  955. wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
  956. all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
  957. wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
  958. wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
  959. is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
  960. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  961. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  962. PRIMARY KEY (id),
  963. KEY idx_record_id (record_id),
  964. KEY idx_source_type (source, item_type)
  965. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  966. """
  967. with self.conn.cursor() as cursor:
  968. cursor.execute(sql)
  969. self._ensure_demand_export_column(
  970. cursor,
  971. "matched_demand",
  972. """
  973. ALTER TABLE hot_content_demand_exports
  974. ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
  975. COMMENT '匹配到的需求'
  976. AFTER item_text
  977. """,
  978. )
  979. self._ensure_demand_export_column(
  980. cursor,
  981. "point_category",
  982. """
  983. ALTER TABLE hot_content_demand_exports
  984. ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
  985. COMMENT '点类型:灵感点/目的点/关键点'
  986. AFTER item_text
  987. """,
  988. )
  989. self._ensure_demand_export_column(
  990. cursor,
  991. "contribution_score",
  992. """
  993. ALTER TABLE hot_content_demand_exports
  994. ADD COLUMN contribution_score DOUBLE NULL
  995. COMMENT '贡献分,仅词有值'
  996. AFTER matched_demand
  997. """,
  998. )
  999. self._ensure_demand_export_column(
  1000. cursor,
  1001. "wxindex_trend",
  1002. """
  1003. ALTER TABLE hot_content_demand_exports
  1004. ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
  1005. COMMENT '微信指数趋势'
  1006. AFTER wxindex_latest_score
  1007. """,
  1008. )
  1009. self._ensure_demand_export_column(
  1010. cursor,
  1011. "wxindex_keyword",
  1012. """
  1013. ALTER TABLE hot_content_demand_exports
  1014. ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
  1015. COMMENT '获取微信指数的词'
  1016. AFTER contribution_score
  1017. """,
  1018. )
  1019. self._ensure_demand_export_column(
  1020. cursor,
  1021. "all_hot_keywords",
  1022. """
  1023. ALTER TABLE hot_content_demand_exports
  1024. ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
  1025. COMMENT '全部热点词'
  1026. AFTER wxindex_keyword
  1027. """,
  1028. )
  1029. self._ensure_demand_export_column(
  1030. cursor,
  1031. "is_as_demand",
  1032. """
  1033. ALTER TABLE hot_content_demand_exports
  1034. ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
  1035. COMMENT '是否作为需求:0否 1是'
  1036. AFTER wxindex_trend
  1037. """,
  1038. )
  1039. self._ensure_demand_export_column(
  1040. cursor,
  1041. "event_sense_score",
  1042. """
  1043. ALTER TABLE hot_content_demand_exports
  1044. ADD COLUMN event_sense_score DOUBLE NULL
  1045. COMMENT '事件性得分 0-10'
  1046. AFTER is_as_demand
  1047. """,
  1048. )
  1049. self._ensure_demand_export_column(
  1050. cursor,
  1051. "senior_fit_score",
  1052. """
  1053. ALTER TABLE hot_content_demand_exports
  1054. ADD COLUMN senior_fit_score DOUBLE NULL
  1055. COMMENT '老年性得分 0-10'
  1056. AFTER event_sense_score
  1057. """,
  1058. )
  1059. def _ensure_record_quality_columns(self) -> None:
  1060. with self.conn.cursor() as cursor:
  1061. for column_name, alter_sql in (
  1062. (
  1063. "demand_event_sense_json",
  1064. """
  1065. ALTER TABLE hot_content_records
  1066. ADD COLUMN demand_event_sense_json JSON NULL
  1067. COMMENT '需求事件性 LLM 评分结果'
  1068. AFTER wxindex_trend_json
  1069. """,
  1070. ),
  1071. (
  1072. "demand_senior_fit_json",
  1073. """
  1074. ALTER TABLE hot_content_records
  1075. ADD COLUMN demand_senior_fit_json JSON NULL
  1076. COMMENT '需求老年性 LLM 评分结果'
  1077. AFTER demand_event_sense_json
  1078. """,
  1079. ),
  1080. ):
  1081. cursor.execute(
  1082. """
  1083. SELECT COUNT(*) AS cnt
  1084. FROM information_schema.COLUMNS
  1085. WHERE TABLE_SCHEMA = DATABASE()
  1086. AND TABLE_NAME = 'hot_content_records'
  1087. AND COLUMN_NAME = %s
  1088. """,
  1089. (column_name,),
  1090. )
  1091. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1092. cursor.execute(alter_sql)
  1093. def _ensure_category_filter_columns(self) -> None:
  1094. with self.conn.cursor() as cursor:
  1095. for column_name, alter_sql in (
  1096. (
  1097. "category_filter_json",
  1098. """
  1099. ALTER TABLE hot_content_records
  1100. ADD COLUMN category_filter_json JSON NULL
  1101. COMMENT '老年人兴趣分类筛选 LLM 结果'
  1102. AFTER article_url
  1103. """,
  1104. ),
  1105. (
  1106. "category_filter_passed",
  1107. """
  1108. ALTER TABLE hot_content_records
  1109. ADD COLUMN category_filter_passed TINYINT NULL
  1110. COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
  1111. AFTER category_filter_json
  1112. """,
  1113. ),
  1114. (
  1115. "category_filter_reason",
  1116. """
  1117. ALTER TABLE hot_content_records
  1118. ADD COLUMN category_filter_reason TEXT NULL
  1119. COMMENT '分类筛选原因'
  1120. AFTER category_filter_passed
  1121. """,
  1122. ),
  1123. ):
  1124. cursor.execute(
  1125. """
  1126. SELECT COUNT(*) AS cnt
  1127. FROM information_schema.COLUMNS
  1128. WHERE TABLE_SCHEMA = DATABASE()
  1129. AND TABLE_NAME = 'hot_content_records'
  1130. AND COLUMN_NAME = %s
  1131. """,
  1132. (column_name,),
  1133. )
  1134. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1135. cursor.execute(alter_sql)
  1136. def _ensure_demand_export_column(
  1137. self,
  1138. cursor: Any,
  1139. column_name: str,
  1140. alter_sql: str,
  1141. ) -> None:
  1142. cursor.execute(
  1143. """
  1144. SELECT COUNT(*) AS cnt
  1145. FROM information_schema.COLUMNS
  1146. WHERE TABLE_SCHEMA = DATABASE()
  1147. AND TABLE_NAME = 'hot_content_demand_exports'
  1148. AND COLUMN_NAME = %s
  1149. """,
  1150. (column_name,),
  1151. )
  1152. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1153. cursor.execute(alter_sql)
  1154. def list_synced_odps_demand_ids(
  1155. self,
  1156. *,
  1157. partition_dt: str,
  1158. ) -> set[str]:
  1159. """返回 hot_content_odps_sync_log 中指定分区日已有的 demand_id(跨流程去重用)。"""
  1160. normalized_partition_dt = str(partition_dt or "").strip()
  1161. if not normalized_partition_dt:
  1162. return set()
  1163. self._ensure_odps_sync_log_table()
  1164. sql = """
  1165. SELECT demand_id
  1166. FROM hot_content_odps_sync_log
  1167. WHERE partition_dt = %s
  1168. """
  1169. with self.conn.cursor() as cursor:
  1170. cursor.execute(sql, (normalized_partition_dt,))
  1171. rows = cursor.fetchall()
  1172. return {
  1173. str(row.get("demand_id") or "").strip()
  1174. for row in rows
  1175. if str(row.get("demand_id") or "").strip()
  1176. }
  1177. def count_odps_sync_log_rows(self, *, partition_dt: str) -> int:
  1178. """统计 hot_content_odps_sync_log 指定分区日已写入条数(供日限额计算)。"""
  1179. normalized_partition_dt = str(partition_dt or "").strip()
  1180. if not normalized_partition_dt:
  1181. return 0
  1182. self._ensure_odps_sync_log_table()
  1183. sql = """
  1184. SELECT COUNT(*) AS cnt
  1185. FROM hot_content_odps_sync_log
  1186. WHERE partition_dt = %s
  1187. """
  1188. with self.conn.cursor() as cursor:
  1189. cursor.execute(sql, (normalized_partition_dt,))
  1190. row = cursor.fetchone() or {}
  1191. return int(row.get("cnt") or 0)
  1192. def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
  1193. if not rows:
  1194. return 0
  1195. self._ensure_odps_sync_log_table()
  1196. sql = """
  1197. INSERT INTO hot_content_odps_sync_log (
  1198. partition_dt,
  1199. strategy,
  1200. demand_id,
  1201. demand_name,
  1202. demand_type,
  1203. record_id,
  1204. weight,
  1205. extend
  1206. )
  1207. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
  1208. ON DUPLICATE KEY UPDATE
  1209. demand_name = VALUES(demand_name),
  1210. demand_type = VALUES(demand_type),
  1211. record_id = VALUES(record_id),
  1212. weight = VALUES(weight),
  1213. extend = VALUES(extend),
  1214. synced_at = CURRENT_TIMESTAMP
  1215. """
  1216. insert_rows = [
  1217. (
  1218. str(item.get("partition_dt") or ""),
  1219. str(item.get("strategy") or ""),
  1220. str(item.get("demand_id") or ""),
  1221. str(item.get("demand_name") or ""),
  1222. str(item.get("demand_type") or ""),
  1223. int(item.get("record_id") or 0),
  1224. float(item["weight"]) if item.get("weight") is not None else None,
  1225. str(item.get("extend") or "{}"),
  1226. )
  1227. for item in rows
  1228. if str(item.get("demand_id") or "").strip()
  1229. ]
  1230. with self.conn.cursor() as cursor:
  1231. cursor.executemany(sql, insert_rows)
  1232. return len(insert_rows)
  1233. def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
  1234. cursor.execute(
  1235. """
  1236. SELECT COUNT(*) AS cnt
  1237. FROM information_schema.COLUMNS
  1238. WHERE TABLE_SCHEMA = DATABASE()
  1239. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1240. AND COLUMN_NAME = 'weight'
  1241. """,
  1242. )
  1243. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1244. cursor.execute(
  1245. """
  1246. ALTER TABLE hot_content_odps_sync_log
  1247. ADD COLUMN weight DOUBLE NULL DEFAULT NULL
  1248. COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
  1249. AFTER record_id
  1250. """
  1251. )
  1252. def _ensure_odps_sync_log_extend_column(self, cursor: Any) -> None:
  1253. cursor.execute(
  1254. """
  1255. SELECT COUNT(*) AS cnt
  1256. FROM information_schema.COLUMNS
  1257. WHERE TABLE_SCHEMA = DATABASE()
  1258. AND TABLE_NAME = 'hot_content_odps_sync_log'
  1259. AND COLUMN_NAME = 'extend'
  1260. """,
  1261. )
  1262. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1263. cursor.execute(
  1264. """
  1265. ALTER TABLE hot_content_odps_sync_log
  1266. ADD COLUMN extend TEXT NULL DEFAULT NULL
  1267. COMMENT 'ODPS extend 扩展字段 JSON'
  1268. AFTER weight
  1269. """
  1270. )
  1271. def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
  1272. word = str(name or "").strip()
  1273. if not word:
  1274. return []
  1275. self._ensure_wxindex_words_table()
  1276. sql = """
  1277. SELECT dt, total_score
  1278. FROM hot_content_wxindex_words
  1279. WHERE name = %s
  1280. ORDER BY dt ASC
  1281. """
  1282. with self.conn.cursor() as cursor:
  1283. cursor.execute(sql, (word,))
  1284. rows = cursor.fetchall()
  1285. scores: list[dict[str, Any]] = []
  1286. for row in rows:
  1287. dt = str(row.get("dt") or "").strip()
  1288. if not dt:
  1289. continue
  1290. try:
  1291. total_score = float(row["total_score"])
  1292. except (TypeError, ValueError, KeyError):
  1293. continue
  1294. scores.append({"ymd": dt, "total_score": total_score})
  1295. return scores
  1296. def list_wxindex_word_scores_in_range(
  1297. self,
  1298. name: str,
  1299. *,
  1300. start_ymd: str,
  1301. end_ymd: str,
  1302. ) -> list[dict[str, Any]]:
  1303. word = str(name or "").strip()
  1304. start = str(start_ymd or "").strip()
  1305. end = str(end_ymd or "").strip()
  1306. if not word or not start or not end:
  1307. return []
  1308. self._ensure_wxindex_words_table()
  1309. sql = """
  1310. SELECT dt, total_score
  1311. FROM hot_content_wxindex_words
  1312. WHERE name = %s
  1313. AND dt >= %s
  1314. AND dt <= %s
  1315. ORDER BY dt ASC
  1316. """
  1317. with self.conn.cursor() as cursor:
  1318. cursor.execute(sql, (word, start, end))
  1319. rows = cursor.fetchall()
  1320. scores: list[dict[str, Any]] = []
  1321. for row in rows:
  1322. dt = str(row.get("dt") or "").strip()
  1323. if not dt:
  1324. continue
  1325. try:
  1326. total_score = float(row["total_score"])
  1327. except (TypeError, ValueError, KeyError):
  1328. continue
  1329. scores.append({"ymd": dt, "total_score": total_score})
  1330. return scores
  1331. def list_active_wxindex_word_meta(
  1332. self,
  1333. *,
  1334. today: date | None = None,
  1335. ) -> list[dict[str, Any]]:
  1336. """返回当天仍在抓取窗口内的 meta(today < fetch_end_ymd)。"""
  1337. current = today or datetime.now(SHANGHAI_TZ).date()
  1338. today_ymd = current.strftime("%Y%m%d")
  1339. self._ensure_wxindex_word_meta_table()
  1340. sql = """
  1341. SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
  1342. FROM hot_content_wxindex_word_meta
  1343. WHERE fetch_end_ymd > %s
  1344. ORDER BY id ASC
  1345. """
  1346. with self.conn.cursor() as cursor:
  1347. cursor.execute(sql, (today_ymd,))
  1348. rows = cursor.fetchall()
  1349. result: list[dict[str, Any]] = []
  1350. for row in rows:
  1351. meta = self._normalize_wxindex_word_meta_row(row)
  1352. if meta is not None:
  1353. result.append(meta)
  1354. return result
  1355. def save_wxindex_word_record(self, payload: dict[str, Any]) -> int:
  1356. row = _wxindex_word_record_row(payload)
  1357. name = str(row[0])
  1358. analyze_ymd = str(row[2])
  1359. self._ensure_wxindex_word_records_table()
  1360. with self.conn.cursor() as cursor:
  1361. cursor.execute(_WXINDEX_WORD_RECORD_UPSERT_SQL, row)
  1362. cursor.execute(
  1363. """
  1364. SELECT id
  1365. FROM hot_content_wxindex_word_records
  1366. WHERE name = %s
  1367. AND analyze_ymd = %s
  1368. """,
  1369. (name, analyze_ymd),
  1370. )
  1371. record_row = cursor.fetchone() or {}
  1372. return int(record_row.get("id") or 0)
  1373. def init_wxindex_word_records(
  1374. self,
  1375. payloads: list[dict[str, Any]],
  1376. ) -> dict[str, int]:
  1377. """批量 init 追溯记录:一次 executemany,重复跑只更新 meta/抓取窗口。"""
  1378. if not payloads:
  1379. return {}
  1380. rows: list[tuple[Any, ...]] = []
  1381. analyze_ymd = ""
  1382. names: list[str] = []
  1383. for payload in payloads:
  1384. row = _wxindex_word_record_row(payload)
  1385. rows.append(row)
  1386. analyze_ymd = str(row[2])
  1387. names.append(str(row[0]))
  1388. self._ensure_wxindex_word_records_table()
  1389. with self.conn.cursor() as cursor:
  1390. cursor.executemany(_WXINDEX_WORD_RECORD_INIT_UPSERT_SQL, rows)
  1391. return self.map_wxindex_word_record_ids(analyze_ymd=analyze_ymd, names=names)
  1392. def map_wxindex_word_record_ids(
  1393. self,
  1394. *,
  1395. analyze_ymd: str,
  1396. names: list[str],
  1397. ) -> dict[str, int]:
  1398. normalized_analyze_ymd = str(analyze_ymd or "").strip()
  1399. normalized_names = [
  1400. str(name or "").strip() for name in names if str(name or "").strip()
  1401. ]
  1402. if not normalized_analyze_ymd or not normalized_names:
  1403. return {}
  1404. placeholders = ", ".join(["%s"] * len(normalized_names))
  1405. sql = f"""
  1406. SELECT id, name
  1407. FROM hot_content_wxindex_word_records
  1408. WHERE analyze_ymd = %s
  1409. AND name IN ({placeholders})
  1410. """
  1411. with self.conn.cursor() as cursor:
  1412. cursor.execute(sql, [normalized_analyze_ymd, *normalized_names])
  1413. rows = cursor.fetchall()
  1414. return {
  1415. str(row.get("name") or "").strip(): int(row.get("id") or 0)
  1416. for row in rows
  1417. if str(row.get("name") or "").strip()
  1418. }
  1419. def list_wxindex_word_records_by_analyze_ymd(
  1420. self,
  1421. *,
  1422. analyze_ymd: str,
  1423. names: list[str] | None = None,
  1424. ) -> dict[str, dict[str, Any]]:
  1425. """按 analyze_ymd 批量读取追溯记录,供同日重跑跳过已完成阶段。"""
  1426. normalized_analyze_ymd = str(analyze_ymd or "").strip()
  1427. if not normalized_analyze_ymd:
  1428. return {}
  1429. normalized_names = [
  1430. str(name or "").strip() for name in (names or []) if str(name or "").strip()
  1431. ]
  1432. self._ensure_wxindex_word_records_table()
  1433. params: list[Any] = [normalized_analyze_ymd]
  1434. name_filter = ""
  1435. if normalized_names:
  1436. placeholders = ", ".join(["%s"] * len(normalized_names))
  1437. name_filter = f" AND name IN ({placeholders})"
  1438. params.extend(normalized_names)
  1439. sql = f"""
  1440. SELECT
  1441. id,
  1442. name,
  1443. meta_id,
  1444. analyze_ymd,
  1445. fetch_start_ymd,
  1446. fetch_end_ymd,
  1447. data_start_ymd,
  1448. data_end_ymd,
  1449. data_days,
  1450. is_sustained_high,
  1451. is_rising,
  1452. is_spike,
  1453. retain_reason,
  1454. is_internal_demand_matched,
  1455. matched_demand,
  1456. demand_cache_run_id,
  1457. internal_demand_match_json,
  1458. senior_fit_score,
  1459. demand_senior_fit_json,
  1460. is_final_retained,
  1461. min_score,
  1462. max_score,
  1463. avg_score,
  1464. detail_json
  1465. FROM hot_content_wxindex_word_records
  1466. WHERE analyze_ymd = %s{name_filter}
  1467. """
  1468. with self.conn.cursor() as cursor:
  1469. cursor.execute(sql, params)
  1470. rows = cursor.fetchall()
  1471. result: dict[str, dict[str, Any]] = {}
  1472. for row in rows or []:
  1473. name = str(row.get("name") or "").strip()
  1474. if not name:
  1475. continue
  1476. record = dict(row)
  1477. record["internal_demand_match_json"] = _json_loads(
  1478. record.get("internal_demand_match_json")
  1479. )
  1480. record["demand_senior_fit_json"] = _json_loads(
  1481. record.get("demand_senior_fit_json")
  1482. )
  1483. record["detail_json"] = _json_loads(record.get("detail_json"))
  1484. result[name] = record
  1485. return result
  1486. def list_wxindex_word_stats_names(
  1487. self,
  1488. *,
  1489. analyze_ymd: str,
  1490. names: list[str] | None = None,
  1491. ) -> set[str]:
  1492. """返回指定 analyze_ymd 下已写入 stats 表的词名集合。"""
  1493. normalized_analyze_ymd = str(analyze_ymd or "").strip()
  1494. if not normalized_analyze_ymd:
  1495. return set()
  1496. normalized_names = [
  1497. str(name or "").strip() for name in (names or []) if str(name or "").strip()
  1498. ]
  1499. self._ensure_wxindex_word_stats_table()
  1500. params: list[Any] = [normalized_analyze_ymd]
  1501. name_filter = ""
  1502. if normalized_names:
  1503. placeholders = ", ".join(["%s"] * len(normalized_names))
  1504. name_filter = f" AND name IN ({placeholders})"
  1505. params.extend(normalized_names)
  1506. sql = f"""
  1507. SELECT name
  1508. FROM hot_content_wxindex_word_stats
  1509. WHERE analyze_ymd = %s{name_filter}
  1510. """
  1511. with self.conn.cursor() as cursor:
  1512. cursor.execute(sql, params)
  1513. rows = cursor.fetchall()
  1514. return {
  1515. str(row.get("name") or "").strip()
  1516. for row in rows or []
  1517. if str(row.get("name") or "").strip()
  1518. }
  1519. def save_wxindex_word_stats_batch(
  1520. self,
  1521. payloads: list[dict[str, Any]],
  1522. ) -> int:
  1523. """批量写入通过热度+老年性筛选的词统计。"""
  1524. if not payloads:
  1525. return 0
  1526. rows = [_wxindex_word_stats_row(payload) for payload in payloads]
  1527. self._ensure_wxindex_word_stats_table()
  1528. with self.conn.cursor() as cursor:
  1529. cursor.executemany(_WXINDEX_WORD_STATS_UPSERT_SQL, rows)
  1530. return len(rows)
  1531. def _ensure_wxindex_word_stats_table(self) -> None:
  1532. sql = """
  1533. CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_stats (
  1534. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  1535. name VARCHAR(256) NOT NULL COMMENT '词',
  1536. meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
  1537. analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
  1538. wxindex_word_record_id BIGINT UNSIGNED NULL COMMENT '关联 records.id',
  1539. retain_reason VARCHAR(64) NULL COMMENT '热度保留原因',
  1540. senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
  1541. data_start_ymd VARCHAR(8) NULL COMMENT '分析数据起始日',
  1542. data_end_ymd VARCHAR(8) NULL COMMENT '分析数据结束日',
  1543. data_days INT NULL COMMENT '分析天数',
  1544. min_score DOUBLE NULL COMMENT '区间最低热度',
  1545. max_score DOUBLE NULL COMMENT '区间最高热度',
  1546. avg_score DOUBLE NULL COMMENT '区间平均热度',
  1547. detail_json JSON NULL COMMENT '扩展详情',
  1548. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  1549. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
  1550. ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  1551. PRIMARY KEY (id),
  1552. UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
  1553. KEY idx_analyze_ymd (analyze_ymd),
  1554. KEY idx_retain_reason (retain_reason)
  1555. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1556. """
  1557. with self.conn.cursor() as cursor:
  1558. cursor.execute(sql)
  1559. def _ensure_wxindex_word_records_table(self) -> None:
  1560. sql = """
  1561. CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_records (
  1562. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  1563. name VARCHAR(256) NOT NULL COMMENT '词',
  1564. meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
  1565. analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
  1566. fetch_start_ymd VARCHAR(8) NULL COMMENT 'meta 抓取起始日',
  1567. fetch_end_ymd VARCHAR(8) NULL COMMENT 'meta 抓取结束日',
  1568. data_start_ymd VARCHAR(8) NULL COMMENT '实际分析数据起始日',
  1569. data_end_ymd VARCHAR(8) NULL COMMENT '实际分析数据结束日',
  1570. data_days INT NULL COMMENT '实际分析天数',
  1571. is_sustained_high TINYINT(1) NULL COMMENT '持续热度>1000万',
  1572. is_rising TINYINT(1) NULL COMMENT '热度持续上涨',
  1573. is_spike TINYINT(1) NULL COMMENT '最近3天突然暴涨',
  1574. retain_reason VARCHAR(64) NULL COMMENT '保留原因(按2->3->1优先级)',
  1575. is_internal_demand_matched TINYINT(1) NULL COMMENT '是否匹配票圈内部需求',
  1576. matched_demand VARCHAR(1024) NULL COMMENT '匹配到的内部需求',
  1577. demand_cache_run_id BIGINT UNSIGNED NULL COMMENT '需求池缓存ID',
  1578. internal_demand_match_json JSON NULL COMMENT '内部需求匹配详情',
  1579. senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
  1580. demand_senior_fit_json JSON NULL COMMENT '老年性 LLM 评分结果',
  1581. is_final_retained TINYINT(1) NULL COMMENT '老年性达标且最终保留',
  1582. min_score DOUBLE NULL COMMENT '区间最低热度',
  1583. max_score DOUBLE NULL COMMENT '区间最高热度',
  1584. avg_score DOUBLE NULL COMMENT '区间平均热度',
  1585. detail_json JSON NULL COMMENT '分析详情',
  1586. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  1587. updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
  1588. ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  1589. PRIMARY KEY (id),
  1590. UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
  1591. KEY idx_analyze_ymd (analyze_ymd),
  1592. KEY idx_patterns (is_sustained_high, is_rising, is_spike),
  1593. KEY idx_retain_reason (retain_reason)
  1594. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  1595. """
  1596. with self.conn.cursor() as cursor:
  1597. cursor.execute(sql)
  1598. self._ensure_wxindex_word_records_retain_columns(cursor)
  1599. self._ensure_wxindex_word_records_senior_columns(cursor)
  1600. self._ensure_wxindex_word_records_nullable_columns(cursor)
  1601. def _ensure_wxindex_word_records_nullable_columns(self, cursor: Any) -> None:
  1602. nullable_specs = {
  1603. "fetch_start_ymd": (
  1604. "MODIFY COLUMN fetch_start_ymd VARCHAR(8) NULL "
  1605. "COMMENT 'meta 抓取起始日'"
  1606. ),
  1607. "fetch_end_ymd": (
  1608. "MODIFY COLUMN fetch_end_ymd VARCHAR(8) NULL "
  1609. "COMMENT 'meta 抓取结束日'"
  1610. ),
  1611. "data_start_ymd": (
  1612. "MODIFY COLUMN data_start_ymd VARCHAR(8) NULL "
  1613. "COMMENT '实际分析数据起始日'"
  1614. ),
  1615. "data_end_ymd": (
  1616. "MODIFY COLUMN data_end_ymd VARCHAR(8) NULL "
  1617. "COMMENT '实际分析数据结束日'"
  1618. ),
  1619. "data_days": (
  1620. "MODIFY COLUMN data_days INT NULL COMMENT '实际分析天数'"
  1621. ),
  1622. "is_sustained_high": (
  1623. "MODIFY COLUMN is_sustained_high TINYINT(1) NULL "
  1624. "COMMENT '持续热度>1000万'"
  1625. ),
  1626. "is_rising": (
  1627. "MODIFY COLUMN is_rising TINYINT(1) NULL "
  1628. "COMMENT '热度持续上涨'"
  1629. ),
  1630. "is_spike": (
  1631. "MODIFY COLUMN is_spike TINYINT(1) NULL "
  1632. "COMMENT '最近3天突然暴涨'"
  1633. ),
  1634. "matched_demand": (
  1635. "MODIFY COLUMN matched_demand VARCHAR(1024) NULL "
  1636. "COMMENT '匹配到的内部需求'"
  1637. ),
  1638. "is_final_retained": (
  1639. "MODIFY COLUMN is_final_retained TINYINT(1) NULL "
  1640. "COMMENT '老年性达标且最终保留'"
  1641. ),
  1642. }
  1643. column_names = list(nullable_specs.keys())
  1644. placeholders = ", ".join(["%s"] * len(column_names))
  1645. cursor.execute(
  1646. f"""
  1647. SELECT COLUMN_NAME, IS_NULLABLE
  1648. FROM information_schema.COLUMNS
  1649. WHERE TABLE_SCHEMA = DATABASE()
  1650. AND TABLE_NAME = 'hot_content_wxindex_word_records'
  1651. AND COLUMN_NAME IN ({placeholders})
  1652. """,
  1653. column_names,
  1654. )
  1655. nullable_map = {
  1656. str(row.get("COLUMN_NAME") or ""): str(row.get("IS_NULLABLE") or "").upper()
  1657. for row in (cursor.fetchall() or [])
  1658. }
  1659. alters = [
  1660. sql
  1661. for column_name, sql in nullable_specs.items()
  1662. if nullable_map.get(column_name) == "NO"
  1663. ]
  1664. if alters:
  1665. cursor.execute(
  1666. "ALTER TABLE hot_content_wxindex_word_records " + ", ".join(alters)
  1667. )
  1668. def _ensure_wxindex_word_records_senior_columns(self, cursor: Any) -> None:
  1669. columns = {
  1670. "senior_fit_score": (
  1671. "ALTER TABLE hot_content_wxindex_word_records "
  1672. "ADD COLUMN senior_fit_score DOUBLE NULL "
  1673. "COMMENT '老年性得分 0-10' "
  1674. "AFTER internal_demand_match_json"
  1675. ),
  1676. "demand_senior_fit_json": (
  1677. "ALTER TABLE hot_content_wxindex_word_records "
  1678. "ADD COLUMN demand_senior_fit_json JSON NULL "
  1679. "COMMENT '老年性 LLM 评分结果' "
  1680. "AFTER senior_fit_score"
  1681. ),
  1682. "is_final_retained": (
  1683. "ALTER TABLE hot_content_wxindex_word_records "
  1684. "ADD COLUMN is_final_retained TINYINT(1) NULL "
  1685. "COMMENT '老年性达标且最终保留' "
  1686. "AFTER demand_senior_fit_json"
  1687. ),
  1688. }
  1689. for column_name, alter_sql in columns.items():
  1690. cursor.execute(
  1691. """
  1692. SELECT COUNT(*) AS cnt
  1693. FROM information_schema.COLUMNS
  1694. WHERE TABLE_SCHEMA = DATABASE()
  1695. AND TABLE_NAME = 'hot_content_wxindex_word_records'
  1696. AND COLUMN_NAME = %s
  1697. """,
  1698. (column_name,),
  1699. )
  1700. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1701. cursor.execute(alter_sql)
  1702. def _ensure_wxindex_word_records_retain_columns(self, cursor: Any) -> None:
  1703. columns = {
  1704. "retain_reason": (
  1705. "ALTER TABLE hot_content_wxindex_word_records "
  1706. "ADD COLUMN retain_reason VARCHAR(64) NULL "
  1707. "COMMENT '保留原因(按2->3->1优先级)' "
  1708. "AFTER is_spike"
  1709. ),
  1710. "is_internal_demand_matched": (
  1711. "ALTER TABLE hot_content_wxindex_word_records "
  1712. "ADD COLUMN is_internal_demand_matched TINYINT(1) NULL "
  1713. "COMMENT '是否匹配票圈内部需求' "
  1714. "AFTER retain_reason"
  1715. ),
  1716. "matched_demand": (
  1717. "ALTER TABLE hot_content_wxindex_word_records "
  1718. "ADD COLUMN matched_demand VARCHAR(1024) NULL "
  1719. "COMMENT '匹配到的内部需求' "
  1720. "AFTER is_internal_demand_matched"
  1721. ),
  1722. "demand_cache_run_id": (
  1723. "ALTER TABLE hot_content_wxindex_word_records "
  1724. "ADD COLUMN demand_cache_run_id BIGINT UNSIGNED NULL "
  1725. "COMMENT '需求池缓存ID' "
  1726. "AFTER matched_demand"
  1727. ),
  1728. "internal_demand_match_json": (
  1729. "ALTER TABLE hot_content_wxindex_word_records "
  1730. "ADD COLUMN internal_demand_match_json JSON NULL "
  1731. "COMMENT '内部需求匹配详情' "
  1732. "AFTER demand_cache_run_id"
  1733. ),
  1734. }
  1735. for column_name, alter_sql in columns.items():
  1736. cursor.execute(
  1737. """
  1738. SELECT COUNT(*) AS cnt
  1739. FROM information_schema.COLUMNS
  1740. WHERE TABLE_SCHEMA = DATABASE()
  1741. AND TABLE_NAME = 'hot_content_wxindex_word_records'
  1742. AND COLUMN_NAME = %s
  1743. """,
  1744. (column_name,),
  1745. )
  1746. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  1747. cursor.execute(alter_sql)
  1748. def list_stale_wxindex_words(
  1749. self,
  1750. *,
  1751. end_ymd: str,
  1752. update_window_days: int = 7,
  1753. today: date | None = None,
  1754. ) -> list[dict[str, Any]]:
  1755. """返回更新窗口内、仍缺近 7 日区间数据的词。"""
  1756. target_end = str(end_ymd or "").strip()
  1757. if not target_end:
  1758. return []
  1759. current = today or datetime.now(SHANGHAI_TZ).date()
  1760. active_since = current - timedelta(days=max(update_window_days, 0))
  1761. self._ensure_wxindex_word_meta_table()
  1762. self._ensure_wxindex_words_table()
  1763. sql = """
  1764. SELECT
  1765. m.name,
  1766. m.event_created_at,
  1767. m.fetch_start_ymd,
  1768. MIN(w.dt) AS earliest_dt,
  1769. MAX(w.dt) AS latest_dt
  1770. FROM hot_content_wxindex_word_meta m
  1771. INNER JOIN hot_content_wxindex_words w ON w.name = m.name
  1772. WHERE DATE(m.event_created_at) >= %s
  1773. GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
  1774. HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
  1775. ORDER BY m.name ASC
  1776. """
  1777. with self.conn.cursor() as cursor:
  1778. cursor.execute(sql, (active_since, target_end))
  1779. rows = cursor.fetchall()
  1780. stale_words: list[dict[str, Any]] = []
  1781. for row in rows:
  1782. name = str(row.get("name") or "").strip()
  1783. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  1784. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1785. latest_dt = str(row.get("latest_dt") or "").strip()
  1786. event_created_at = row.get("event_created_at")
  1787. if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
  1788. stale_words.append(
  1789. {
  1790. "name": name,
  1791. "event_created_at": event_created_at,
  1792. "fetch_start_ymd": fetch_start_ymd,
  1793. "earliest_dt": earliest_dt,
  1794. "latest_dt": latest_dt,
  1795. }
  1796. )
  1797. return stale_words
  1798. def list_word_earliest_event_times(
  1799. self,
  1800. *,
  1801. since_dt: datetime,
  1802. ) -> dict[str, datetime]:
  1803. """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
  1804. self._ensure_record_quality_columns()
  1805. sql = """
  1806. SELECT
  1807. word_name,
  1808. MIN(event_created_at) AS event_created_at
  1809. FROM (
  1810. SELECT
  1811. TRIM(searches.keyword) AS word_name,
  1812. r.created_at AS event_created_at
  1813. FROM hot_content_records r
  1814. JOIN JSON_TABLE(
  1815. r.wxindex_trend_json,
  1816. '$.wxindex_searches[*]' COLUMNS (
  1817. keyword VARCHAR(256) PATH '$.keyword'
  1818. )
  1819. ) AS searches
  1820. WHERE r.created_at >= %s
  1821. AND r.wxindex_trend_json IS NOT NULL
  1822. AND TRIM(searches.keyword) <> ''
  1823. UNION ALL
  1824. SELECT
  1825. TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
  1826. r.created_at AS event_created_at
  1827. FROM hot_content_records r
  1828. WHERE r.created_at >= %s
  1829. AND r.wxindex_trend_json IS NOT NULL
  1830. AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
  1831. ) AS word_events
  1832. WHERE word_name IS NOT NULL
  1833. AND word_name <> ''
  1834. GROUP BY word_name
  1835. """
  1836. with self.conn.cursor() as cursor:
  1837. cursor.execute(sql, (since_dt, since_dt))
  1838. rows = cursor.fetchall()
  1839. event_map: dict[str, datetime] = {}
  1840. for row in rows:
  1841. name = str(row.get("word_name") or "").strip()
  1842. event_created_at = row.get("event_created_at")
  1843. if name and isinstance(event_created_at, datetime):
  1844. event_map[name] = event_created_at
  1845. return event_map
  1846. def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
  1847. self._ensure_wxindex_word_meta_table()
  1848. self._ensure_wxindex_words_table()
  1849. sql = """
  1850. SELECT
  1851. w.name,
  1852. MIN(w.dt) AS earliest_dt,
  1853. MIN(w.created_at) AS first_created_at
  1854. FROM hot_content_wxindex_words w
  1855. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1856. WHERE m.name IS NULL
  1857. GROUP BY w.name
  1858. ORDER BY w.name ASC
  1859. """
  1860. with self.conn.cursor() as cursor:
  1861. cursor.execute(sql)
  1862. rows = cursor.fetchall()
  1863. bounds: list[dict[str, Any]] = []
  1864. for row in rows:
  1865. name = str(row.get("name") or "").strip()
  1866. earliest_dt = str(row.get("earliest_dt") or "").strip()
  1867. first_created_at = row.get("first_created_at")
  1868. if name and earliest_dt:
  1869. bounds.append(
  1870. {
  1871. "name": name,
  1872. "earliest_dt": earliest_dt,
  1873. "first_created_at": first_created_at,
  1874. }
  1875. )
  1876. return bounds
  1877. def list_wxindex_word_names_without_meta(self) -> list[str]:
  1878. self._ensure_wxindex_word_meta_table()
  1879. self._ensure_wxindex_words_table()
  1880. sql = """
  1881. SELECT DISTINCT w.name
  1882. FROM hot_content_wxindex_words w
  1883. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  1884. WHERE m.name IS NULL
  1885. ORDER BY w.name ASC
  1886. """
  1887. with self.conn.cursor() as cursor:
  1888. cursor.execute(sql)
  1889. rows = cursor.fetchall()
  1890. return [
  1891. str(row.get("name") or "").strip()
  1892. for row in rows
  1893. if str(row.get("name") or "").strip()
  1894. ]
  1895. def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
  1896. word = str(name or "").strip()
  1897. if not word:
  1898. return None
  1899. self._ensure_wxindex_words_table()
  1900. sql = """
  1901. SELECT MIN(created_at) AS first_created_at
  1902. FROM hot_content_wxindex_words
  1903. WHERE name = %s
  1904. """
  1905. with self.conn.cursor() as cursor:
  1906. cursor.execute(sql, (word,))
  1907. row = cursor.fetchone() or {}
  1908. first_created_at = row.get("first_created_at")
  1909. return first_created_at if isinstance(first_created_at, datetime) else None
  1910. def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
  1911. self._ensure_wxindex_word_meta_table()
  1912. sql = """
  1913. SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
  1914. FROM hot_content_wxindex_word_meta
  1915. ORDER BY id ASC
  1916. """
  1917. with self.conn.cursor() as cursor:
  1918. cursor.execute(sql)
  1919. rows = cursor.fetchall()
  1920. result: list[dict[str, Any]] = []
  1921. for row in rows:
  1922. meta = self._normalize_wxindex_word_meta_row(row)
  1923. if meta is not None:
  1924. result.append(meta)
  1925. return result
  1926. def update_wxindex_word_meta_fetch_start(
  1927. self,
  1928. *,
  1929. name: str,
  1930. fetch_start_ymd: str,
  1931. ) -> None:
  1932. word = str(name or "").strip()
  1933. target_start = str(fetch_start_ymd or "").strip()
  1934. if not word or not target_start:
  1935. raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
  1936. self._ensure_wxindex_word_meta_table()
  1937. sql = """
  1938. UPDATE hot_content_wxindex_word_meta
  1939. SET fetch_start_ymd = %s
  1940. WHERE name = %s
  1941. """
  1942. with self.conn.cursor() as cursor:
  1943. cursor.execute(sql, (target_start, word))
  1944. def update_wxindex_word_meta(
  1945. self,
  1946. *,
  1947. name: str,
  1948. event_created_at: datetime,
  1949. fetch_start_ymd: str,
  1950. fetch_end_ymd: str,
  1951. ) -> None:
  1952. word = str(name or "").strip()
  1953. target_start = str(fetch_start_ymd or "").strip()
  1954. target_end = str(fetch_end_ymd or "").strip()
  1955. if not word or not target_start or not target_end:
  1956. raise HotContentFlowError("invalid wxindex word meta payload")
  1957. self._ensure_wxindex_word_meta_table()
  1958. event_at = event_created_at
  1959. if event_at.tzinfo is not None:
  1960. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  1961. sql = """
  1962. UPDATE hot_content_wxindex_word_meta
  1963. SET event_created_at = %s,
  1964. fetch_start_ymd = %s,
  1965. fetch_end_ymd = %s
  1966. WHERE name = %s
  1967. """
  1968. with self.conn.cursor() as cursor:
  1969. cursor.execute(sql, (event_at, target_start, target_end, word))
  1970. def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
  1971. word = str(name or "").strip()
  1972. if not word:
  1973. return None
  1974. self._ensure_wxindex_word_meta_table()
  1975. sql = """
  1976. SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
  1977. FROM hot_content_wxindex_word_meta
  1978. WHERE name = %s
  1979. """
  1980. with self.conn.cursor() as cursor:
  1981. cursor.execute(sql, (word,))
  1982. row = cursor.fetchone()
  1983. if not row:
  1984. return None
  1985. return self._normalize_wxindex_word_meta_row(row)
  1986. def ensure_wxindex_word_meta(
  1987. self,
  1988. *,
  1989. name: str,
  1990. event_created_at: datetime,
  1991. fetch_start_ymd: str,
  1992. fetch_end_ymd: str,
  1993. ) -> dict[str, Any]:
  1994. word = str(name or "").strip()
  1995. target_start = str(fetch_start_ymd or "").strip()
  1996. target_end = str(fetch_end_ymd or "").strip()
  1997. if not word or not target_start or not target_end:
  1998. raise HotContentFlowError("invalid wxindex word meta payload")
  1999. self._ensure_wxindex_word_meta_table()
  2000. event_at = event_created_at
  2001. if event_at.tzinfo is not None:
  2002. event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
  2003. sql = """
  2004. INSERT INTO hot_content_wxindex_word_meta (
  2005. name,
  2006. event_created_at,
  2007. fetch_start_ymd,
  2008. fetch_end_ymd
  2009. )
  2010. VALUES (%s, %s, %s, %s)
  2011. ON DUPLICATE KEY UPDATE
  2012. event_created_at = VALUES(event_created_at),
  2013. fetch_start_ymd = VALUES(fetch_start_ymd),
  2014. fetch_end_ymd = VALUES(fetch_end_ymd)
  2015. """
  2016. with self.conn.cursor() as cursor:
  2017. cursor.execute(sql, (word, event_at, target_start, target_end))
  2018. meta = self.get_wxindex_word_meta(word)
  2019. if meta is None:
  2020. raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
  2021. return meta
  2022. def delete_wxindex_word_meta_by_names(self, names: list[str]) -> int:
  2023. words = [str(name or "").strip() for name in names]
  2024. words = [name for name in words if name]
  2025. if not words:
  2026. return 0
  2027. self._ensure_wxindex_word_meta_table()
  2028. placeholders = ", ".join(["%s"] * len(words))
  2029. sql = f"DELETE FROM hot_content_wxindex_word_meta WHERE name IN ({placeholders})"
  2030. with self.conn.cursor() as cursor:
  2031. cursor.execute(sql, tuple(words))
  2032. return int(cursor.rowcount or 0)
  2033. def list_low_max_wxindex_words(
  2034. self,
  2035. *,
  2036. min_max_score: float,
  2037. ) -> list[dict[str, Any]]:
  2038. """按 name 聚合,返回最大值低于阈值的词。"""
  2039. self._ensure_wxindex_words_table()
  2040. sql = """
  2041. SELECT
  2042. name,
  2043. MAX(total_score) AS max_score,
  2044. COUNT(*) AS row_count
  2045. FROM hot_content_wxindex_words
  2046. GROUP BY name
  2047. HAVING MAX(total_score) < %s
  2048. ORDER BY name ASC
  2049. """
  2050. with self.conn.cursor() as cursor:
  2051. cursor.execute(sql, (min_max_score,))
  2052. rows = cursor.fetchall()
  2053. low_words: list[dict[str, Any]] = []
  2054. for row in rows:
  2055. name = str(row.get("name") or "").strip()
  2056. if not name:
  2057. continue
  2058. try:
  2059. max_score = float(row["max_score"])
  2060. row_count = int(row["row_count"])
  2061. except (TypeError, ValueError, KeyError):
  2062. continue
  2063. low_words.append(
  2064. {
  2065. "name": name,
  2066. "max_score": max_score,
  2067. "row_count": row_count,
  2068. }
  2069. )
  2070. return low_words
  2071. def count_wxindex_words_outside_event_window(
  2072. self,
  2073. *,
  2074. window_days: int = 7,
  2075. ) -> int:
  2076. self._ensure_wxindex_word_meta_table()
  2077. self._ensure_wxindex_words_table()
  2078. sql = """
  2079. SELECT COUNT(*) AS row_count
  2080. FROM hot_content_wxindex_words w
  2081. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  2082. WHERE w.dt < m.fetch_start_ymd
  2083. OR w.dt > m.fetch_end_ymd
  2084. """
  2085. with self.conn.cursor() as cursor:
  2086. cursor.execute(sql)
  2087. row = cursor.fetchone() or {}
  2088. return int(row.get("row_count") or 0)
  2089. def list_wxindex_words_outside_event_window_samples(
  2090. self,
  2091. *,
  2092. window_days: int = 7,
  2093. limit: int = 20,
  2094. ) -> list[dict[str, Any]]:
  2095. self._ensure_wxindex_word_meta_table()
  2096. self._ensure_wxindex_words_table()
  2097. sql = """
  2098. SELECT
  2099. w.name,
  2100. w.dt,
  2101. m.event_created_at,
  2102. m.fetch_start_ymd AS start_ymd,
  2103. m.fetch_end_ymd AS end_ymd
  2104. FROM hot_content_wxindex_words w
  2105. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  2106. WHERE w.dt < m.fetch_start_ymd
  2107. OR w.dt > m.fetch_end_ymd
  2108. ORDER BY w.name ASC, w.dt ASC
  2109. LIMIT %s
  2110. """
  2111. with self.conn.cursor() as cursor:
  2112. cursor.execute(sql, (limit,))
  2113. rows = cursor.fetchall()
  2114. samples: list[dict[str, Any]] = []
  2115. for row in rows:
  2116. name = str(row.get("name") or "").strip()
  2117. dt = str(row.get("dt") or "").strip()
  2118. if name and dt:
  2119. samples.append(
  2120. {
  2121. "name": name,
  2122. "dt": dt,
  2123. "event_created_at": row.get("event_created_at"),
  2124. "start_ymd": str(row.get("start_ymd") or "").strip(),
  2125. "end_ymd": str(row.get("end_ymd") or "").strip(),
  2126. }
  2127. )
  2128. return samples
  2129. def delete_wxindex_words_outside_event_window(
  2130. self,
  2131. *,
  2132. window_days: int = 7,
  2133. ) -> int:
  2134. self._ensure_wxindex_word_meta_table()
  2135. self._ensure_wxindex_words_table()
  2136. sql = """
  2137. DELETE w
  2138. FROM hot_content_wxindex_words w
  2139. INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  2140. WHERE w.dt < m.fetch_start_ymd
  2141. OR w.dt > m.fetch_end_ymd
  2142. """
  2143. with self.conn.cursor() as cursor:
  2144. cursor.execute(sql)
  2145. return int(cursor.rowcount or 0)
  2146. def delete_wxindex_words_without_meta(self) -> int:
  2147. self._ensure_wxindex_word_meta_table()
  2148. self._ensure_wxindex_words_table()
  2149. sql = """
  2150. DELETE w
  2151. FROM hot_content_wxindex_words w
  2152. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  2153. WHERE m.name IS NULL
  2154. """
  2155. with self.conn.cursor() as cursor:
  2156. cursor.execute(sql)
  2157. return int(cursor.rowcount or 0)
  2158. def count_wxindex_words_without_meta(self) -> int:
  2159. self._ensure_wxindex_word_meta_table()
  2160. self._ensure_wxindex_words_table()
  2161. sql = """
  2162. SELECT COUNT(*) AS row_count
  2163. FROM hot_content_wxindex_words w
  2164. LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
  2165. WHERE m.name IS NULL
  2166. """
  2167. with self.conn.cursor() as cursor:
  2168. cursor.execute(sql)
  2169. row = cursor.fetchone() or {}
  2170. return int(row.get("row_count") or 0)
  2171. def delete_wxindex_words_by_names(self, names: list[str]) -> int:
  2172. cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
  2173. if not cleaned:
  2174. return 0
  2175. self._ensure_wxindex_words_table()
  2176. placeholders = ", ".join(["%s"] * len(cleaned))
  2177. sql = f"""
  2178. DELETE FROM hot_content_wxindex_words
  2179. WHERE name IN ({placeholders})
  2180. """
  2181. with self.conn.cursor() as cursor:
  2182. cursor.execute(sql, tuple(cleaned))
  2183. return int(cursor.rowcount or 0)
  2184. def list_wxindex_word_names_with_dt(
  2185. self,
  2186. names: list[str],
  2187. *,
  2188. dt: str,
  2189. ) -> set[str]:
  2190. """返回在 hot_content_wxindex_words 中存在指定日期数据的词名集合。"""
  2191. target_dt = str(dt or "").strip()
  2192. normalized_names = [
  2193. str(name or "").strip() for name in names if str(name or "").strip()
  2194. ]
  2195. if not target_dt or not normalized_names:
  2196. return set()
  2197. self._ensure_wxindex_words_table()
  2198. placeholders = ", ".join(["%s"] * len(normalized_names))
  2199. sql = f"""
  2200. SELECT DISTINCT name
  2201. FROM hot_content_wxindex_words
  2202. WHERE dt = %s
  2203. AND name IN ({placeholders})
  2204. """
  2205. with self.conn.cursor() as cursor:
  2206. cursor.execute(sql, [target_dt, *normalized_names])
  2207. rows = cursor.fetchall()
  2208. return {
  2209. str(row.get("name") or "").strip()
  2210. for row in rows or []
  2211. if str(row.get("name") or "").strip()
  2212. }
  2213. def has_wxindex_word(self, name: str) -> bool:
  2214. return self.get_wxindex_word_latest_dt(name) is not None
  2215. def get_wxindex_word_latest_dt(self, name: str) -> str | None:
  2216. word = str(name or "").strip()
  2217. if not word:
  2218. return None
  2219. self._ensure_wxindex_words_table()
  2220. sql = """
  2221. SELECT MAX(dt) AS latest_dt
  2222. FROM hot_content_wxindex_words
  2223. WHERE name = %s
  2224. """
  2225. with self.conn.cursor() as cursor:
  2226. cursor.execute(sql, (word,))
  2227. row = cursor.fetchone() or {}
  2228. latest_dt = str(row.get("latest_dt") or "").strip()
  2229. return latest_dt or None
  2230. def save_wxindex_daily_scores(
  2231. self,
  2232. *,
  2233. name: str,
  2234. scores: list[dict[str, Any]],
  2235. ) -> tuple[int, int]:
  2236. """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
  2237. word = str(name or "").strip()
  2238. if not word or not scores:
  2239. return 0, 0
  2240. self._ensure_wxindex_words_table()
  2241. sql = """
  2242. INSERT IGNORE INTO hot_content_wxindex_words (
  2243. name,
  2244. dt,
  2245. total_score
  2246. )
  2247. VALUES (%s, %s, %s)
  2248. """
  2249. rows: list[tuple[str, str, float]] = []
  2250. seen: set[tuple[str, str]] = set()
  2251. for item in scores:
  2252. if not isinstance(item, dict):
  2253. continue
  2254. dt = str(item.get("ymd") or item.get("dt") or "").strip()
  2255. if not dt:
  2256. continue
  2257. try:
  2258. total_score = float(item["total_score"])
  2259. except (TypeError, ValueError, KeyError):
  2260. continue
  2261. key = (word, dt)
  2262. if key in seen:
  2263. continue
  2264. seen.add(key)
  2265. rows.append((word, dt, total_score))
  2266. if not rows:
  2267. return 0, 0
  2268. inserted = 0
  2269. with self.conn.cursor() as cursor:
  2270. for row in rows:
  2271. cursor.execute(sql, row)
  2272. inserted += int(cursor.rowcount or 0)
  2273. skipped = len(rows) - inserted
  2274. return inserted, skipped
  2275. def list_records_with_wxindex_trend_after(
  2276. self,
  2277. *,
  2278. after_created_at: datetime,
  2279. ) -> list[dict[str, Any]]:
  2280. sql = """
  2281. SELECT id, created_at, wxindex_trend_json
  2282. FROM hot_content_records
  2283. WHERE created_at > %s
  2284. AND wxindex_trend_json IS NOT NULL
  2285. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  2286. ORDER BY id ASC
  2287. """
  2288. with self.conn.cursor() as cursor:
  2289. cursor.execute(sql, (after_created_at,))
  2290. rows = cursor.fetchall()
  2291. records: list[dict[str, Any]] = []
  2292. for row in rows:
  2293. records.append(
  2294. {
  2295. "id": int(row["id"]),
  2296. "created_at": row.get("created_at"),
  2297. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  2298. }
  2299. )
  2300. return records
  2301. def list_records_with_wxindex_trend(
  2302. self,
  2303. *,
  2304. since_dt: datetime,
  2305. ) -> list[dict[str, Any]]:
  2306. sql = """
  2307. SELECT id, created_at, wxindex_trend_json
  2308. FROM hot_content_records
  2309. WHERE created_at >= %s
  2310. AND wxindex_trend_json IS NOT NULL
  2311. AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
  2312. ORDER BY id ASC
  2313. """
  2314. with self.conn.cursor() as cursor:
  2315. cursor.execute(sql, (since_dt,))
  2316. rows = cursor.fetchall()
  2317. records: list[dict[str, Any]] = []
  2318. for row in rows:
  2319. records.append(
  2320. {
  2321. "id": int(row["id"]),
  2322. "created_at": row.get("created_at"),
  2323. "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
  2324. }
  2325. )
  2326. return records
  2327. def _ensure_wxindex_word_meta_table(self) -> None:
  2328. sql = """
  2329. CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
  2330. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  2331. name VARCHAR(256) NOT NULL COMMENT '词',
  2332. event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
  2333. fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
  2334. fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天',
  2335. meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
  2336. PRIMARY KEY (id),
  2337. UNIQUE KEY uk_name (name),
  2338. KEY idx_event_created_at (event_created_at)
  2339. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  2340. """
  2341. with self.conn.cursor() as cursor:
  2342. cursor.execute(sql)
  2343. self._ensure_wxindex_word_meta_id_column(cursor)
  2344. self._ensure_wxindex_word_meta_fetch_end_column(cursor)
  2345. def _ensure_wxindex_word_meta_fetch_end_column(self, cursor: Any) -> None:
  2346. cursor.execute(
  2347. """
  2348. SELECT COUNT(*) AS cnt
  2349. FROM information_schema.COLUMNS
  2350. WHERE TABLE_SCHEMA = DATABASE()
  2351. AND TABLE_NAME = 'hot_content_wxindex_word_meta'
  2352. AND COLUMN_NAME = 'fetch_end_ymd'
  2353. """
  2354. )
  2355. if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
  2356. cursor.execute(
  2357. """
  2358. ALTER TABLE hot_content_wxindex_word_meta
  2359. ADD COLUMN fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT ''
  2360. COMMENT '数据窗口右边界:事件创建日后7天'
  2361. AFTER fetch_start_ymd
  2362. """
  2363. )
  2364. cursor.execute(
  2365. """
  2366. UPDATE hot_content_wxindex_word_meta
  2367. SET fetch_end_ymd = DATE_FORMAT(
  2368. DATE_ADD(DATE(event_created_at), INTERVAL 7 DAY),
  2369. '%Y%m%d'
  2370. )
  2371. WHERE fetch_end_ymd IS NULL
  2372. OR TRIM(fetch_end_ymd) = ''
  2373. """
  2374. )
  2375. def _ensure_wxindex_word_meta_id_column(self, cursor: Any) -> None:
  2376. cursor.execute(
  2377. """
  2378. SELECT COUNT(*) AS cnt
  2379. FROM information_schema.COLUMNS
  2380. WHERE TABLE_SCHEMA = DATABASE()
  2381. AND TABLE_NAME = 'hot_content_wxindex_word_meta'
  2382. AND COLUMN_NAME = 'id'
  2383. """
  2384. )
  2385. if int((cursor.fetchone() or {}).get("cnt") or 0) > 0:
  2386. return
  2387. cursor.execute(
  2388. """
  2389. ALTER TABLE hot_content_wxindex_word_meta
  2390. DROP PRIMARY KEY,
  2391. ADD COLUMN id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST,
  2392. ADD UNIQUE KEY uk_name (name)
  2393. """
  2394. )
  2395. @staticmethod
  2396. def _normalize_wxindex_word_meta_row(row: dict[str, Any]) -> dict[str, Any] | None:
  2397. name = str(row.get("name") or "").strip()
  2398. fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
  2399. fetch_end_ymd = str(row.get("fetch_end_ymd") or "").strip()
  2400. event_created_at = row.get("event_created_at")
  2401. if not name or not fetch_start_ymd or event_created_at is None:
  2402. return None
  2403. if not fetch_end_ymd and isinstance(event_created_at, datetime):
  2404. event_date = event_created_at.date()
  2405. fetch_end_ymd = (event_date + timedelta(days=7)).strftime("%Y%m%d")
  2406. if not fetch_end_ymd:
  2407. return None
  2408. try:
  2409. meta_id = int(row.get("id"))
  2410. except (TypeError, ValueError):
  2411. return None
  2412. return {
  2413. "id": meta_id,
  2414. "name": name,
  2415. "event_created_at": event_created_at,
  2416. "fetch_start_ymd": fetch_start_ymd,
  2417. "fetch_end_ymd": fetch_end_ymd,
  2418. }
  2419. def _ensure_wxindex_words_table(self) -> None:
  2420. sql = """
  2421. CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
  2422. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  2423. name VARCHAR(256) NOT NULL COMMENT '词',
  2424. dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
  2425. total_score DOUBLE NOT NULL COMMENT '微信指数',
  2426. created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  2427. PRIMARY KEY (id),
  2428. UNIQUE KEY uk_name_dt (name, dt),
  2429. KEY idx_name (name),
  2430. KEY idx_dt (dt)
  2431. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  2432. """
  2433. with self.conn.cursor() as cursor:
  2434. cursor.execute(sql)
  2435. def _ensure_odps_sync_log_table(self) -> None:
  2436. sql = """
  2437. CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
  2438. id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  2439. partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
  2440. strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
  2441. demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
  2442. demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
  2443. demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
  2444. record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
  2445. weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
  2446. extend TEXT NULL DEFAULT NULL COMMENT 'ODPS extend 扩展字段 JSON',
  2447. synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
  2448. PRIMARY KEY (id),
  2449. UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
  2450. KEY idx_record_partition (record_id, partition_dt),
  2451. KEY idx_partition_strategy (partition_dt, strategy)
  2452. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  2453. """
  2454. with self.conn.cursor() as cursor:
  2455. cursor.execute(sql)
  2456. self._ensure_odps_sync_log_weight_column(cursor)
  2457. self._ensure_odps_sync_log_extend_column(cursor)