wxindex_heat_pattern.py 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654
  1. """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
  2. from __future__ import annotations
  3. import csv
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. from app.hot_content.client import JsonApiClient
  9. from app.hot_content.demand_cache_service import DemandCacheService
  10. from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
  11. from app.hot_content.demand_quality import (
  12. TYPE_PHRASE,
  13. llm_score_senior_fit,
  14. lookup_quality_scores,
  15. )
  16. from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
  17. from app.hot_content.exceptions import HotContentFlowError
  18. from app.hot_content.postprocess_service import ContributionPostprocessService
  19. from app.hot_content.repository import HotContentRepository
  20. from app.hot_content.timezone import SHANGHAI_TZ
  21. from app.hot_content.types import FlowConfig
  22. from app.hot_content.wxindex_trend import (
  23. HEAT_RISING_ADJACENT_UP_RATIO,
  24. HEAT_RISING_OVERALL_CHANGE_RATE,
  25. HEAT_RISING_WINDOW_CHANGE_RATE,
  26. HEAT_SPIKE_BASELINE_FLOOR,
  27. HEAT_SPIKE_RATIO,
  28. extract_sorted_scores,
  29. is_wxindex_heat_rising_scores,
  30. is_wxindex_spike_scores,
  31. )
  32. from app.hot_content.wxindex_words import filter_scores_in_ymd_window
  33. WXINDEX_HEAT_MIN_DAYS = 7
  34. WXINDEX_HEAT_MAX_DAYS = 14
  35. WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
  36. WXINDEX_SPIKE_LOOKBACK_DAYS = 3
  37. WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
  38. WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
  39. WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
  40. WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
  41. WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
  42. WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
  43. WXINDEX_WORD_LLM_BATCH_SIZE = 10
  44. PATTERN_SUSTAINED_HIGH = "sustained_high"
  45. PATTERN_RISING = "rising"
  46. PATTERN_SPIKE = "spike"
  47. RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
  48. RETAIN_REASON_RISING = "热度持续上涨"
  49. RETAIN_REASON_SPIKE = "热度突然暴涨"
  50. PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
  51. PHASE_HEAT_ANALYZED = "heat_analyzed"
  52. PHASE_DEMAND_MATCHED = "demand_matched"
  53. PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
  54. PHASE_FINALIZED = "finalized"
  55. _HEAT_DONE_PHASES = frozenset(
  56. {
  57. PHASE_ANALYSIS_SKIPPED,
  58. PHASE_HEAT_ANALYZED,
  59. PHASE_DEMAND_MATCHED,
  60. PHASE_SENIOR_FIT_SCORED,
  61. PHASE_FINALIZED,
  62. }
  63. )
  64. _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
  65. "data_start_ymd": None,
  66. "data_end_ymd": None,
  67. "data_days": None,
  68. "is_sustained_high": None,
  69. "is_rising": None,
  70. "is_spike": None,
  71. "retain_reason": None,
  72. "is_internal_demand_matched": None,
  73. "matched_demand": None,
  74. "internal_demand_match_json": None,
  75. "senior_fit_score": None,
  76. "demand_senior_fit_json": None,
  77. "is_final_retained": None,
  78. "min_score": None,
  79. "max_score": None,
  80. "avg_score": None,
  81. "detail_json": None,
  82. }
  83. def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
  84. detail = record.get("detail_json")
  85. if isinstance(detail, dict):
  86. return detail
  87. if isinstance(detail, str) and detail.strip():
  88. try:
  89. parsed = json.loads(detail)
  90. return parsed if isinstance(parsed, dict) else {}
  91. except json.JSONDecodeError:
  92. return {}
  93. return {}
  94. def _record_phase(record: dict[str, Any]) -> str:
  95. return str(_parse_record_detail_json(record).get("phase") or "").strip()
  96. def _nullable_bool_from_record(value: Any) -> bool | None:
  97. if value is None:
  98. return None
  99. return bool(value)
  100. def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
  101. value = record.get(field)
  102. if value is None:
  103. return None
  104. if isinstance(value, (dict, list)):
  105. return value
  106. if isinstance(value, (bytes, bytearray)):
  107. value = value.decode("utf-8")
  108. if isinstance(value, str):
  109. try:
  110. return json.loads(value)
  111. except json.JSONDecodeError:
  112. return None
  113. return value
  114. def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
  115. if not record:
  116. return False
  117. return _record_phase(record) in _HEAT_DONE_PHASES
  118. def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
  119. return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
  120. def _is_senior_fit_needed(
  121. *,
  122. retain_reason: str | None,
  123. is_internal_demand_matched: bool | None,
  124. ) -> bool:
  125. if not retain_reason:
  126. return False
  127. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  128. return bool(is_internal_demand_matched)
  129. return True
  130. def _is_finalized_record(record: dict[str, Any] | None) -> bool:
  131. if not record:
  132. return False
  133. return _record_phase(record) == PHASE_FINALIZED
  134. def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
  135. ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
  136. return max(ymds) if ymds else None
  137. def _should_rerun_heat_analysis(
  138. existing_record: dict[str, Any] | None,
  139. *,
  140. scores: list[dict[str, Any]],
  141. fetch_start_ymd: str,
  142. fetch_end_ymd: str,
  143. min_days: int,
  144. max_days: int,
  145. ) -> bool:
  146. if not _is_heat_analysis_done(existing_record):
  147. return True
  148. assert existing_record is not None
  149. latest_ymd = _latest_score_ymd(scores)
  150. record_end = str(existing_record.get("data_end_ymd") or "").strip()
  151. if latest_ymd and record_end and latest_ymd > record_end:
  152. return True
  153. if _is_analysis_skipped_record(existing_record):
  154. _, skip_reason = prepare_analysis_scores(
  155. scores,
  156. start_ymd=fetch_start_ymd,
  157. end_ymd=fetch_end_ymd,
  158. min_days=min_days,
  159. max_days=max_days,
  160. )
  161. if skip_reason is None:
  162. return True
  163. return False
  164. def _is_senior_fit_attempt_done(
  165. item: dict[str, Any],
  166. existing_record: dict[str, Any] | None,
  167. ) -> bool:
  168. if existing_record:
  169. phase = _record_phase(existing_record)
  170. if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
  171. return True
  172. return item.get("senior_fit_score") is not None
  173. def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
  174. if score is None:
  175. return False
  176. try:
  177. return float(score) / 10.0 > senior_threshold
  178. except (TypeError, ValueError):
  179. return False
  180. def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
  181. detail = _parse_record_detail_json(record)
  182. retain_reason_raw = record.get("retain_reason")
  183. retain_reason = (
  184. str(retain_reason_raw).strip() if retain_reason_raw is not None else None
  185. ) or None
  186. def _bool_field(key: str) -> bool | None:
  187. value = record.get(key)
  188. if value is None:
  189. return None
  190. return bool(value)
  191. return {
  192. "skipped": False,
  193. "skip_reason": None,
  194. "data_days": record.get("data_days"),
  195. "data_start_ymd": record.get("data_start_ymd"),
  196. "data_end_ymd": record.get("data_end_ymd"),
  197. "fetch_start_ymd": record.get("fetch_start_ymd"),
  198. "fetch_end_ymd": record.get("fetch_end_ymd"),
  199. "min_score": record.get("min_score"),
  200. "max_score": record.get("max_score"),
  201. "avg_score": record.get("avg_score"),
  202. "is_sustained_high": _bool_field("is_sustained_high"),
  203. "is_rising": _bool_field("is_rising"),
  204. "is_spike": _bool_field("is_spike"),
  205. "retain_reason": retain_reason,
  206. "patterns": list(detail.get("patterns") or []),
  207. }
  208. def _rehydrate_pending_item_from_record(
  209. item: dict[str, Any],
  210. record: dict[str, Any],
  211. *,
  212. senior_threshold: float,
  213. ) -> dict[str, Any]:
  214. result = _rehydrate_result_from_record(record)
  215. retain_reason = result.get("retain_reason")
  216. is_internal_demand_matched = _nullable_bool_from_record(
  217. record.get("is_internal_demand_matched")
  218. )
  219. matched_demand_raw = record.get("matched_demand")
  220. matched_demand = (
  221. str(matched_demand_raw).strip() if matched_demand_raw is not None else None
  222. ) or None
  223. senior_fit_score = record.get("senior_fit_score")
  224. senior_fit_passed: bool | None = None
  225. is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
  226. if senior_fit_score is not None:
  227. senior_fit_passed = _senior_fit_passed_from_score(
  228. senior_fit_score,
  229. senior_threshold=senior_threshold,
  230. )
  231. elif not _is_senior_fit_needed(
  232. retain_reason=retain_reason,
  233. is_internal_demand_matched=is_internal_demand_matched,
  234. ):
  235. senior_fit_passed = False
  236. if is_final_retained is None:
  237. is_final_retained = False
  238. return {
  239. "meta": item["meta"],
  240. "name": item["name"],
  241. "fetch_start_ymd": item["fetch_start_ymd"],
  242. "fetch_end_ymd": item["fetch_end_ymd"],
  243. "result": result,
  244. "retain_reason": retain_reason,
  245. "record_id": int(record.get("id") or item.get("record_id") or 0),
  246. "is_internal_demand_matched": is_internal_demand_matched,
  247. "matched_demand": matched_demand,
  248. "internal_demand_match_json": _parse_record_json_field(
  249. record,
  250. "internal_demand_match_json",
  251. ),
  252. "senior_fit_score": senior_fit_score,
  253. "demand_senior_fit_json": _parse_record_json_field(
  254. record,
  255. "demand_senior_fit_json",
  256. ),
  257. "senior_fit_passed": senior_fit_passed,
  258. "is_final_retained": is_final_retained,
  259. }
  260. def _build_skipped_export_row_from_record(
  261. *,
  262. record: dict[str, Any],
  263. meta: dict[str, Any],
  264. name: str,
  265. analyze_ymd: str,
  266. fetch_start_ymd: str,
  267. fetch_end_ymd: str,
  268. ) -> dict[str, Any]:
  269. detail = _parse_record_detail_json(record)
  270. return {
  271. "analyze_ymd": analyze_ymd,
  272. "name": name,
  273. "meta_id": meta.get("id"),
  274. "fetch_start_ymd": fetch_start_ymd,
  275. "fetch_end_ymd": fetch_end_ymd,
  276. "analysis_skipped": True,
  277. "skip_reason": detail.get("skip_reason") or "",
  278. "data_days": record.get("data_days") or "",
  279. "retain_reason": "",
  280. "is_sustained_high": False,
  281. "is_rising": False,
  282. "is_spike": False,
  283. "is_internal_demand_matched": "",
  284. "matched_demand": "",
  285. "senior_fit_score": "",
  286. "is_final_retained": False,
  287. "min_score": "",
  288. "max_score": "",
  289. "avg_score": "",
  290. }
  291. def _refresh_wxindex_heat_job_summary(
  292. summary: dict[str, Any],
  293. *,
  294. pending_items: list[dict[str, Any]],
  295. export_rows: list[dict[str, Any]],
  296. ) -> None:
  297. summary["analyzed"] = len(pending_items)
  298. summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
  299. summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
  300. summary["sustained_high"] = sum(
  301. 1
  302. for item in pending_items
  303. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  304. )
  305. summary["rising"] = sum(
  306. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
  307. )
  308. summary["spike"] = sum(
  309. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
  310. )
  311. summary["internal_demand_matched"] = sum(
  312. 1 for item in pending_items if item.get("is_internal_demand_matched")
  313. )
  314. summary["senior_fit_candidates"] = sum(
  315. 1
  316. for item in pending_items
  317. if _is_senior_fit_needed(
  318. retain_reason=item.get("retain_reason"),
  319. is_internal_demand_matched=item.get("is_internal_demand_matched"),
  320. )
  321. )
  322. summary["senior_scored"] = sum(
  323. 1 for item in pending_items if item.get("senior_fit_score") is not None
  324. )
  325. summary["senior_fit_passed"] = sum(
  326. 1 for item in pending_items if item.get("senior_fit_passed")
  327. )
  328. summary["final_retained"] = sum(
  329. 1 for item in pending_items if item.get("is_final_retained")
  330. )
  331. def resolve_retain_reason(
  332. *,
  333. is_sustained_high: bool,
  334. is_rising: bool,
  335. is_spike: bool,
  336. ) -> str | None:
  337. """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
  338. if is_rising:
  339. return RETAIN_REASON_RISING
  340. if is_spike:
  341. return RETAIN_REASON_SPIKE
  342. if is_sustained_high:
  343. return RETAIN_REASON_SUSTAINED_HIGH
  344. return None
  345. def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
  346. size = max(batch_size, 1)
  347. return [words[index : index + size] for index in range(0, len(words), size)]
  348. def _empty_senior_fit_result() -> dict[str, Any]:
  349. return {
  350. "senior_fit_score": None,
  351. "demand_senior_fit_json": {"source": "", "items": []},
  352. "passed": False,
  353. }
  354. def score_wxindex_words_senior_fit(
  355. *,
  356. words: list[str],
  357. config: FlowConfig,
  358. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  359. batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
  360. ) -> dict[str, dict[str, Any]]:
  361. """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
  362. cleaned: list[str] = []
  363. seen: set[str] = set()
  364. for raw in words:
  365. word = str(raw or "").strip()
  366. if not word or word in seen:
  367. continue
  368. seen.add(word)
  369. cleaned.append(word)
  370. results: dict[str, dict[str, Any]] = {
  371. word: _empty_senior_fit_result() for word in cleaned
  372. }
  373. for batch in _chunk_words(cleaned, batch_size=batch_size):
  374. candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
  375. senior_fit_json = llm_score_senior_fit(
  376. channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
  377. candidates=candidates,
  378. model=config.demand_quality_llm_model,
  379. max_attempts=config.demand_quality_llm_max_attempts,
  380. retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
  381. max_tokens=config.demand_quality_llm_max_tokens,
  382. )
  383. senior_fit_json["threshold"] = senior_threshold * 10.0
  384. for word in batch:
  385. _, senior_score = lookup_quality_scores(
  386. demand_type=TYPE_PHRASE,
  387. demand_text=word,
  388. event_sense_json=None,
  389. senior_fit_json=senior_fit_json,
  390. )
  391. passed = (
  392. senior_score is not None
  393. and senior_score / 10.0 > senior_threshold
  394. )
  395. results[word] = {
  396. "senior_fit_score": senior_score,
  397. "demand_senior_fit_json": senior_fit_json,
  398. "passed": passed,
  399. }
  400. return results
  401. def score_wxindex_word_senior_fit(
  402. *,
  403. word: str,
  404. config: FlowConfig,
  405. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  406. ) -> dict[str, Any]:
  407. """对单个微信指数热词执行老年性 LLM 评分。"""
  408. target_word = str(word or "").strip()
  409. if not target_word:
  410. return _empty_senior_fit_result()
  411. batch_results = score_wxindex_words_senior_fit(
  412. words=[target_word],
  413. config=config,
  414. senior_threshold=senior_threshold,
  415. batch_size=1,
  416. )
  417. return batch_results.get(target_word, _empty_senior_fit_result())
  418. def build_wxindex_word_hive_row(
  419. *,
  420. wxindex_word_record_id: int,
  421. word: str,
  422. strategy: str,
  423. partition_dt: str,
  424. max_score: float | None,
  425. ) -> dict[str, Any]:
  426. normalized_name = str(word or "").strip()
  427. weight = 0.0
  428. if max_score is not None:
  429. weight = float(max_score) / WEIGHT_DIVISOR
  430. return {
  431. "record_id": wxindex_word_record_id,
  432. "strategy": strategy,
  433. "demand_id": build_demand_id(
  434. strategy=strategy,
  435. demand_name=normalized_name,
  436. partition_dt=partition_dt,
  437. ),
  438. "demand_name": normalized_name,
  439. "weight": weight,
  440. "type": TYPE_PHRASE,
  441. "video_count": None,
  442. "video_list": [],
  443. "extend": "{}",
  444. "dt": partition_dt,
  445. }
  446. def build_wxindex_word_odps_sync_row(
  447. *,
  448. wxindex_word_record_id: int,
  449. word: str,
  450. strategy: str,
  451. partition_dt: str,
  452. max_score: float | None,
  453. ) -> dict[str, Any]:
  454. normalized_name = str(word or "").strip()
  455. weight = None
  456. if max_score is not None:
  457. weight = float(max_score) / WEIGHT_DIVISOR
  458. return {
  459. "partition_dt": partition_dt,
  460. "strategy": strategy,
  461. "demand_id": build_demand_id(
  462. strategy=strategy,
  463. demand_name=normalized_name,
  464. partition_dt=partition_dt,
  465. ),
  466. "demand_name": normalized_name,
  467. "demand_type": TYPE_PHRASE,
  468. "record_id": wxindex_word_record_id,
  469. "weight": weight,
  470. }
  471. def prepare_analysis_scores(
  472. scores: list[dict[str, Any]],
  473. *,
  474. start_ymd: str,
  475. end_ymd: str,
  476. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  477. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  478. ) -> tuple[list[dict[str, Any]], str | None]:
  479. """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
  480. window_scores = filter_scores_in_ymd_window(
  481. scores,
  482. start_ymd=start_ymd,
  483. end_ymd=end_ymd,
  484. )
  485. if len(window_scores) > max_days:
  486. window_scores = window_scores[-max_days:]
  487. if len(window_scores) < min_days:
  488. return window_scores, "insufficient_days"
  489. return window_scores, None
  490. def _score_row_ymd(row: dict[str, Any]) -> str:
  491. return str(row.get("ymd") or row.get("dt") or "").strip()
  492. def _window_ymd_bounds(
  493. window_scores: list[dict[str, Any]],
  494. ) -> tuple[str | None, str | None]:
  495. if not window_scores:
  496. return None, None
  497. start_ymd = _score_row_ymd(window_scores[0]) or None
  498. end_ymd = _score_row_ymd(window_scores[-1]) or None
  499. return start_ymd, end_ymd
  500. def analyze_wxindex_heat_patterns(
  501. scores: list[dict[str, Any]],
  502. *,
  503. start_ymd: str,
  504. end_ymd: str,
  505. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  506. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  507. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  508. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  509. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  510. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  511. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  512. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  513. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  514. ) -> dict[str, Any]:
  515. """对目标区间数据判断三种热度模式。"""
  516. window_scores, skip_reason = prepare_analysis_scores(
  517. scores,
  518. start_ymd=start_ymd,
  519. end_ymd=end_ymd,
  520. min_days=min_days,
  521. max_days=max_days,
  522. )
  523. if skip_reason:
  524. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  525. return {
  526. "skipped": True,
  527. "skip_reason": skip_reason,
  528. "data_days": len(window_scores) if window_scores else None,
  529. "data_start_ymd": data_start_ymd,
  530. "data_end_ymd": data_end_ymd,
  531. "patterns": [],
  532. }
  533. numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
  534. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  535. is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
  536. is_rising = is_wxindex_heat_rising_scores(
  537. numeric_scores,
  538. min_points=min_days,
  539. overall_change_rate=rising_overall_change_rate,
  540. window_change_rate_threshold=rising_window_change_rate,
  541. adjacent_up_ratio=rising_adjacent_up_ratio,
  542. )
  543. is_spike = is_wxindex_spike_scores(
  544. numeric_scores,
  545. spike_days=spike_days,
  546. min_points=min_days,
  547. spike_ratio=spike_ratio,
  548. baseline_floor=spike_baseline_floor,
  549. )
  550. retain_reason = resolve_retain_reason(
  551. is_sustained_high=is_sustained_high,
  552. is_rising=is_rising,
  553. is_spike=is_spike,
  554. )
  555. patterns: list[str] = []
  556. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  557. patterns.append(PATTERN_SUSTAINED_HIGH)
  558. elif retain_reason == RETAIN_REASON_RISING:
  559. patterns.append(PATTERN_RISING)
  560. elif retain_reason == RETAIN_REASON_SPIKE:
  561. patterns.append(PATTERN_SPIKE)
  562. return {
  563. "skipped": False,
  564. "skip_reason": None,
  565. "data_days": len(window_scores),
  566. "data_start_ymd": data_start_ymd,
  567. "data_end_ymd": data_end_ymd,
  568. "fetch_start_ymd": start_ymd,
  569. "fetch_end_ymd": end_ymd,
  570. "min_score": min(numeric_scores),
  571. "max_score": max(numeric_scores),
  572. "avg_score": sum(numeric_scores) / len(numeric_scores),
  573. "is_sustained_high": is_sustained_high,
  574. "is_rising": is_rising,
  575. "is_spike": is_spike,
  576. "retain_reason": retain_reason,
  577. "patterns": patterns,
  578. "scores": window_scores,
  579. }
  580. def build_wxindex_word_record_init_payload(
  581. *,
  582. meta: dict[str, Any],
  583. name: str,
  584. analyze_ymd: str,
  585. fetch_start_ymd: str,
  586. fetch_end_ymd: str,
  587. demand_cache_run_id: int | None = None,
  588. ) -> dict[str, Any]:
  589. """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
  590. return {
  591. "name": name,
  592. "meta_id": meta.get("id"),
  593. "analyze_ymd": analyze_ymd,
  594. "fetch_start_ymd": fetch_start_ymd,
  595. "fetch_end_ymd": fetch_end_ymd,
  596. "demand_cache_run_id": demand_cache_run_id,
  597. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  598. }
  599. def build_wxindex_word_record_skipped_payload(
  600. *,
  601. meta: dict[str, Any],
  602. name: str,
  603. analyze_ymd: str,
  604. fetch_start_ymd: str,
  605. fetch_end_ymd: str,
  606. result: dict[str, Any],
  607. demand_cache_run_id: int | None = None,
  608. ) -> dict[str, Any]:
  609. """分析被跳过时更新追溯记录。"""
  610. payload = {
  611. "name": name,
  612. "meta_id": meta.get("id"),
  613. "analyze_ymd": analyze_ymd,
  614. "fetch_start_ymd": fetch_start_ymd,
  615. "fetch_end_ymd": fetch_end_ymd,
  616. "demand_cache_run_id": demand_cache_run_id,
  617. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  618. }
  619. payload.update(
  620. {
  621. "data_start_ymd": result.get("data_start_ymd"),
  622. "data_end_ymd": result.get("data_end_ymd"),
  623. "data_days": result.get("data_days"),
  624. "detail_json": {
  625. "phase": "analysis_skipped",
  626. "skip_reason": result.get("skip_reason"),
  627. "data_days": result.get("data_days"),
  628. },
  629. }
  630. )
  631. return payload
  632. def build_wxindex_word_record_payload(
  633. item: dict[str, Any],
  634. *,
  635. analyze_ymd: str,
  636. demand_cache_run_id: int | None,
  637. phase: str,
  638. senior_threshold: float,
  639. sustained_threshold: float,
  640. spike_days: int,
  641. spike_ratio: float,
  642. spike_baseline_floor: float,
  643. rising_overall_change_rate: float,
  644. rising_window_change_rate: float,
  645. rising_adjacent_up_ratio: float,
  646. ) -> dict[str, Any]:
  647. """根据 pending item 当前状态构建 records 表 payload。"""
  648. meta = item["meta"]
  649. name = item["name"]
  650. fetch_start_ymd = item["fetch_start_ymd"]
  651. fetch_end_ymd = item["fetch_end_ymd"]
  652. result = item["result"]
  653. retain_reason = item.get("retain_reason")
  654. return {
  655. "name": name,
  656. "meta_id": meta.get("id"),
  657. "analyze_ymd": analyze_ymd,
  658. "fetch_start_ymd": fetch_start_ymd,
  659. "fetch_end_ymd": fetch_end_ymd,
  660. "data_start_ymd": result.get("data_start_ymd"),
  661. "data_end_ymd": result.get("data_end_ymd"),
  662. "data_days": result.get("data_days"),
  663. "is_sustained_high": result.get("is_sustained_high"),
  664. "is_rising": result.get("is_rising"),
  665. "is_spike": result.get("is_spike"),
  666. "retain_reason": retain_reason,
  667. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  668. "matched_demand": item.get("matched_demand"),
  669. "demand_cache_run_id": demand_cache_run_id,
  670. "internal_demand_match_json": item.get("internal_demand_match_json"),
  671. "senior_fit_score": item.get("senior_fit_score"),
  672. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  673. "is_final_retained": item.get("is_final_retained"),
  674. "min_score": result.get("min_score"),
  675. "max_score": result.get("max_score"),
  676. "avg_score": result.get("avg_score"),
  677. "detail_json": {
  678. "phase": phase,
  679. "patterns": list(result.get("patterns") or []),
  680. "retain_reason": retain_reason,
  681. "retain_reason_priority": "2->3->1",
  682. "senior_fit_threshold": senior_threshold,
  683. "senior_fit_threshold_score": senior_threshold * 10.0,
  684. "is_final_retained": item.get("is_final_retained"),
  685. "sustained_threshold": sustained_threshold,
  686. "spike_days": spike_days,
  687. "spike_ratio": spike_ratio,
  688. "spike_baseline_floor": spike_baseline_floor,
  689. "rising_overall_change_rate": rising_overall_change_rate,
  690. "rising_window_change_rate": rising_window_change_rate,
  691. "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
  692. },
  693. }
  694. def _persist_wxindex_word_record(
  695. repository: HotContentRepository,
  696. payload: dict[str, Any],
  697. *,
  698. dry_run: bool,
  699. skip_db_save: bool,
  700. verbose: bool,
  701. action: str,
  702. ) -> int:
  703. name = str(payload.get("name") or "").strip()
  704. if dry_run or skip_db_save:
  705. if verbose:
  706. label = "dry-run" if dry_run else "skip-db-save"
  707. print(f"[{label}] would {action} wxindex word record word={name}")
  708. return 0
  709. return repository.save_wxindex_word_record(payload)
  710. def _persist_pending_item_record(
  711. repository: HotContentRepository,
  712. item: dict[str, Any],
  713. *,
  714. analyze_ymd: str,
  715. demand_cache_run_id: int | None,
  716. phase: str,
  717. senior_threshold: float,
  718. sustained_threshold: float,
  719. spike_days: int,
  720. spike_ratio: float,
  721. spike_baseline_floor: float,
  722. rising_overall_change_rate: float,
  723. rising_window_change_rate: float,
  724. rising_adjacent_up_ratio: float,
  725. dry_run: bool,
  726. skip_db_save: bool,
  727. verbose: bool,
  728. ) -> int:
  729. payload = build_wxindex_word_record_payload(
  730. item,
  731. analyze_ymd=analyze_ymd,
  732. demand_cache_run_id=demand_cache_run_id,
  733. phase=phase,
  734. senior_threshold=senior_threshold,
  735. sustained_threshold=sustained_threshold,
  736. spike_days=spike_days,
  737. spike_ratio=spike_ratio,
  738. spike_baseline_floor=spike_baseline_floor,
  739. rising_overall_change_rate=rising_overall_change_rate,
  740. rising_window_change_rate=rising_window_change_rate,
  741. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  742. )
  743. record_id = _persist_wxindex_word_record(
  744. repository,
  745. payload,
  746. dry_run=dry_run,
  747. skip_db_save=skip_db_save,
  748. verbose=verbose,
  749. action=f"{phase} word={item['name']}",
  750. )
  751. if record_id:
  752. item["record_id"] = record_id
  753. return int(item.get("record_id") or 0)
  754. def _filter_candidates_awaiting_yesterday_score(
  755. repository: HotContentRepository,
  756. candidate_items: list[dict[str, Any]],
  757. *,
  758. yesterday_ymd: str,
  759. existing_records: dict[str, dict[str, Any]],
  760. verbose: bool,
  761. ) -> tuple[list[dict[str, Any]], int]:
  762. """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
  763. names_to_check = [
  764. item["name"]
  765. for item in candidate_items
  766. if not _is_heat_analysis_done(existing_records.get(item["name"]))
  767. ]
  768. if not names_to_check:
  769. return candidate_items, 0
  770. names_with_yesterday = repository.list_wxindex_word_names_with_dt(
  771. names_to_check,
  772. dt=yesterday_ymd,
  773. )
  774. ready_items: list[dict[str, Any]] = []
  775. awaiting_count = 0
  776. for item in candidate_items:
  777. name = item["name"]
  778. if _is_heat_analysis_done(existing_records.get(name)):
  779. ready_items.append(item)
  780. continue
  781. if name in names_with_yesterday:
  782. ready_items.append(item)
  783. continue
  784. awaiting_count += 1
  785. if verbose:
  786. print(
  787. f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
  788. )
  789. return ready_items, awaiting_count
  790. def _init_candidate_wxindex_word_records(
  791. repository: HotContentRepository,
  792. candidate_items: list[dict[str, Any]],
  793. *,
  794. analyze_ymd: str,
  795. demand_cache_run_id: int | None,
  796. dry_run: bool,
  797. skip_db_save: bool,
  798. verbose: bool,
  799. ) -> None:
  800. if not candidate_items:
  801. return
  802. if dry_run or skip_db_save:
  803. if verbose:
  804. label = "dry-run" if dry_run else "skip-db-save"
  805. print(
  806. f"[{label}] would batch init wxindex word records "
  807. f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
  808. )
  809. return
  810. init_payloads = [
  811. build_wxindex_word_record_init_payload(
  812. meta=item["meta"],
  813. name=item["name"],
  814. analyze_ymd=analyze_ymd,
  815. fetch_start_ymd=item["fetch_start_ymd"],
  816. fetch_end_ymd=item["fetch_end_ymd"],
  817. demand_cache_run_id=demand_cache_run_id,
  818. )
  819. for item in candidate_items
  820. ]
  821. record_id_map = repository.init_wxindex_word_records(init_payloads)
  822. for item in candidate_items:
  823. item["record_id"] = int(record_id_map.get(item["name"]) or 0)
  824. if verbose:
  825. print(
  826. f"init wxindex word records count={len(candidate_items)} "
  827. f"analyze_ymd={analyze_ymd}"
  828. )
  829. def _apply_demand_match_to_item(
  830. item: dict[str, Any],
  831. match_result: dict[str, Any],
  832. ) -> None:
  833. item["internal_demand_match_json"] = match_result
  834. item["is_internal_demand_matched"] = bool(match_result.get("matched"))
  835. matched_demand = str(match_result.get("matched_demand") or "").strip()
  836. item["matched_demand"] = matched_demand or None
  837. if not item.get("is_internal_demand_matched"):
  838. item["is_final_retained"] = False
  839. def _apply_senior_fit_to_item(
  840. item: dict[str, Any],
  841. senior_result: dict[str, Any],
  842. ) -> None:
  843. passed = bool(senior_result.get("passed"))
  844. item["senior_fit_score"] = senior_result.get("senior_fit_score")
  845. item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
  846. item["senior_fit_passed"] = passed
  847. item["is_final_retained"] = passed
  848. def build_wxindex_word_stats_payload(
  849. item: dict[str, Any],
  850. *,
  851. analyze_ymd: str,
  852. ) -> dict[str, Any]:
  853. """构建通过热度+老年性筛选的词统计 payload。"""
  854. meta = item["meta"]
  855. result = item["result"]
  856. return {
  857. "name": item["name"],
  858. "meta_id": meta.get("id"),
  859. "analyze_ymd": analyze_ymd,
  860. "wxindex_word_record_id": item.get("record_id"),
  861. "retain_reason": item.get("retain_reason"),
  862. "senior_fit_score": item.get("senior_fit_score"),
  863. "data_start_ymd": result.get("data_start_ymd"),
  864. "data_end_ymd": result.get("data_end_ymd"),
  865. "data_days": result.get("data_days"),
  866. "min_score": result.get("min_score"),
  867. "max_score": result.get("max_score"),
  868. "avg_score": result.get("avg_score"),
  869. "detail_json": {
  870. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  871. "is_sustained_high": result.get("is_sustained_high"),
  872. "is_rising": result.get("is_rising"),
  873. "is_spike": result.get("is_spike"),
  874. },
  875. }
  876. def _save_senior_fit_passed_stats(
  877. repository: HotContentRepository,
  878. items: list[dict[str, Any]],
  879. *,
  880. analyze_ymd: str,
  881. existing_stats_names: set[str] | None = None,
  882. dry_run: bool,
  883. skip_db_save: bool,
  884. verbose: bool,
  885. ) -> tuple[int, int]:
  886. existing = existing_stats_names or set()
  887. all_payloads = [
  888. build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
  889. for item in items
  890. if item.get("senior_fit_passed")
  891. ]
  892. resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
  893. payloads = [payload for payload in all_payloads if payload["name"] not in existing]
  894. if not payloads:
  895. return 0, resumed
  896. if dry_run or skip_db_save:
  897. if verbose:
  898. label = "dry-run" if dry_run else "skip-db-save"
  899. print(f"[{label}] would save wxindex word stats count={len(payloads)}")
  900. return len(payloads), resumed
  901. saved = repository.save_wxindex_word_stats_batch(payloads)
  902. if verbose:
  903. print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
  904. return saved, resumed
  905. def _build_export_row_from_item(
  906. item: dict[str, Any],
  907. *,
  908. analyze_ymd: str,
  909. strategy: str,
  910. ) -> dict[str, Any]:
  911. meta = item["meta"]
  912. name = item["name"]
  913. result = item["result"]
  914. retain_reason = item.get("retain_reason")
  915. is_internal_demand_matched = item.get("is_internal_demand_matched")
  916. matched_demand = str(item.get("matched_demand") or "")
  917. senior_fit_score = item.get("senior_fit_score")
  918. is_final_retained = bool(item.get("is_final_retained"))
  919. return {
  920. "analyze_ymd": analyze_ymd,
  921. "name": name,
  922. "meta_id": meta.get("id"),
  923. "fetch_start_ymd": item["fetch_start_ymd"],
  924. "fetch_end_ymd": item["fetch_end_ymd"],
  925. "data_start_ymd": result.get("data_start_ymd"),
  926. "data_end_ymd": result.get("data_end_ymd"),
  927. "data_days": result.get("data_days"),
  928. "analysis_skipped": False,
  929. "skip_reason": "",
  930. "is_sustained_high": bool(result.get("is_sustained_high")),
  931. "is_rising": bool(result.get("is_rising")),
  932. "is_spike": bool(result.get("is_spike")),
  933. "retain_reason": retain_reason or "",
  934. "is_internal_demand_matched": (
  935. "" if is_internal_demand_matched is None else is_internal_demand_matched
  936. ),
  937. "matched_demand": matched_demand,
  938. "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
  939. "is_final_retained": is_final_retained,
  940. "min_score": result.get("min_score"),
  941. "max_score": result.get("max_score"),
  942. "avg_score": result.get("avg_score"),
  943. "demand_id": (
  944. build_demand_id(
  945. strategy=strategy,
  946. demand_name=name,
  947. partition_dt=analyze_ymd,
  948. )
  949. if is_final_retained and strategy
  950. else ""
  951. ),
  952. "weight": (
  953. float(result.get("max_score") or 0) / WEIGHT_DIVISOR
  954. if is_final_retained and result.get("max_score") is not None
  955. else ""
  956. ),
  957. }
  958. def run_wxindex_heat_pattern_daily_job(
  959. repository: HotContentRepository,
  960. *,
  961. config: FlowConfig | None = None,
  962. api_client: JsonApiClient | None = None,
  963. today: date | None = None,
  964. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  965. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  966. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  967. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  968. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  969. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  970. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  971. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  972. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  973. dry_run: bool = False,
  974. skip_odps: bool = False,
  975. skip_db_save: bool = False,
  976. verbose: bool = False,
  977. ) -> dict[str, Any]:
  978. """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
  979. current = today or datetime.now(SHANGHAI_TZ).date()
  980. analyze_ymd = current.strftime("%Y%m%d")
  981. meta_rows = repository.list_active_wxindex_word_meta(today=current)
  982. demand_name_set: list[str] = []
  983. demand_cache_run_id: int | None = None
  984. demand_cache_error: str | None = None
  985. postprocess_service: ContributionPostprocessService | None = None
  986. if config is not None:
  987. try:
  988. cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
  989. demand_name_set = list(cache.get("demand_name_set") or [])
  990. demand_cache_run_id = int(cache["id"])
  991. if demand_name_set:
  992. postprocess_service = ContributionPostprocessService(
  993. config,
  994. repository,
  995. api_client
  996. or JsonApiClient(
  997. timeout_seconds=config.request_timeout_seconds,
  998. verify_ssl=config.https_verify_ssl,
  999. ),
  1000. )
  1001. except HotContentFlowError as exc:
  1002. demand_cache_error = str(exc)
  1003. if verbose:
  1004. print(f"demand cache unavailable, skip demand match: {exc}")
  1005. summary: dict[str, Any] = {
  1006. "analyze_ymd": analyze_ymd,
  1007. "meta_count": len(meta_rows),
  1008. "analyzed": 0,
  1009. "skipped": 0,
  1010. "retained": 0,
  1011. "sustained_high": 0,
  1012. "rising": 0,
  1013. "spike": 0,
  1014. "internal_demand_matched": 0,
  1015. "senior_scored": 0,
  1016. "senior_fit_candidates": 0,
  1017. "senior_fit_passed": 0,
  1018. "stats_saved": 0,
  1019. "final_retained": 0,
  1020. "odps_synced": 0,
  1021. "odps_written": 0,
  1022. "demand_cache_run_id": demand_cache_run_id,
  1023. "demand_cache_error": demand_cache_error,
  1024. "demand_name_count": len(demand_name_set),
  1025. "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
  1026. "demand_match_batches": 0,
  1027. "senior_fit_batches": 0,
  1028. "records_initialized": 0,
  1029. "awaiting_yesterday_score": 0,
  1030. "heat_resumed": 0,
  1031. "demand_match_resumed": 0,
  1032. "senior_fit_resumed": 0,
  1033. "stats_resumed": 0,
  1034. "finalized_resumed": 0,
  1035. "dry_run": dry_run,
  1036. "skip_odps": skip_odps,
  1037. "skip_db_save": skip_db_save,
  1038. }
  1039. retained_words: list[dict[str, Any]] = []
  1040. export_rows: list[dict[str, Any]] = []
  1041. final_hive_rows: list[dict[str, Any]] = []
  1042. senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
  1043. strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
  1044. pending_items: list[dict[str, Any]] = []
  1045. candidate_items: list[dict[str, Any]] = []
  1046. for meta in meta_rows:
  1047. name = str(meta.get("name") or "").strip()
  1048. fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
  1049. fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
  1050. if not name or not fetch_start_ymd or not fetch_end_ymd:
  1051. summary["skipped"] += 1
  1052. continue
  1053. candidate_items.append(
  1054. {
  1055. "meta": meta,
  1056. "name": name,
  1057. "fetch_start_ymd": fetch_start_ymd,
  1058. "fetch_end_ymd": fetch_end_ymd,
  1059. }
  1060. )
  1061. _init_candidate_wxindex_word_records(
  1062. repository,
  1063. candidate_items,
  1064. analyze_ymd=analyze_ymd,
  1065. demand_cache_run_id=demand_cache_run_id,
  1066. dry_run=dry_run,
  1067. skip_db_save=skip_db_save,
  1068. verbose=verbose,
  1069. )
  1070. summary["records_initialized"] = len(candidate_items)
  1071. existing_records: dict[str, dict[str, Any]] = {}
  1072. existing_stats_names: set[str] = set()
  1073. if not dry_run and not skip_db_save:
  1074. candidate_names = [item["name"] for item in candidate_items]
  1075. existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
  1076. analyze_ymd=analyze_ymd,
  1077. names=candidate_names,
  1078. )
  1079. existing_stats_names = repository.list_wxindex_word_stats_names(
  1080. analyze_ymd=analyze_ymd,
  1081. names=candidate_names,
  1082. )
  1083. yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
  1084. candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
  1085. repository,
  1086. candidate_items,
  1087. yesterday_ymd=yesterday_ymd,
  1088. existing_records=existing_records,
  1089. verbose=verbose,
  1090. )
  1091. summary["awaiting_yesterday_score"] = awaiting_yesterday
  1092. for item in candidate_items:
  1093. if not item.get("record_id"):
  1094. existing = existing_records.get(item["name"])
  1095. if existing:
  1096. item["record_id"] = int(existing.get("id") or 0)
  1097. for item in candidate_items:
  1098. meta = item["meta"]
  1099. name = item["name"]
  1100. fetch_start_ymd = item["fetch_start_ymd"]
  1101. fetch_end_ymd = item["fetch_end_ymd"]
  1102. existing_record = existing_records.get(name)
  1103. scores = repository.list_wxindex_word_scores_in_range(
  1104. name,
  1105. start_ymd=fetch_start_ymd,
  1106. end_ymd=fetch_end_ymd,
  1107. )
  1108. if (
  1109. _is_heat_analysis_done(existing_record)
  1110. and not _should_rerun_heat_analysis(
  1111. existing_record,
  1112. scores=scores,
  1113. fetch_start_ymd=fetch_start_ymd,
  1114. fetch_end_ymd=fetch_end_ymd,
  1115. min_days=min_days,
  1116. max_days=max_days,
  1117. )
  1118. ):
  1119. summary["heat_resumed"] += 1
  1120. if _is_analysis_skipped_record(existing_record):
  1121. if verbose:
  1122. detail = _parse_record_detail_json(existing_record)
  1123. print(
  1124. f"resume skip word={name} reason={detail.get('skip_reason')} "
  1125. f"days={existing_record.get('data_days')}"
  1126. )
  1127. export_rows.append(
  1128. _build_skipped_export_row_from_record(
  1129. record=existing_record,
  1130. meta=meta,
  1131. name=name,
  1132. analyze_ymd=analyze_ymd,
  1133. fetch_start_ymd=fetch_start_ymd,
  1134. fetch_end_ymd=fetch_end_ymd,
  1135. )
  1136. )
  1137. continue
  1138. pending_item = _rehydrate_pending_item_from_record(
  1139. item,
  1140. existing_record,
  1141. senior_threshold=senior_threshold,
  1142. )
  1143. pending_items.append(pending_item)
  1144. if verbose:
  1145. print(
  1146. f"resume heat analyzed word={name} "
  1147. f"retain_reason={pending_item.get('retain_reason') or ''} "
  1148. f"data_days={pending_item['result'].get('data_days')}"
  1149. )
  1150. continue
  1151. result = analyze_wxindex_heat_patterns(
  1152. scores,
  1153. start_ymd=fetch_start_ymd,
  1154. end_ymd=fetch_end_ymd,
  1155. sustained_threshold=sustained_threshold,
  1156. min_days=min_days,
  1157. max_days=max_days,
  1158. spike_days=spike_days,
  1159. spike_ratio=spike_ratio,
  1160. spike_baseline_floor=spike_baseline_floor,
  1161. rising_overall_change_rate=rising_overall_change_rate,
  1162. rising_window_change_rate=rising_window_change_rate,
  1163. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1164. )
  1165. if result.get("skipped"):
  1166. if verbose:
  1167. print(
  1168. f"skip word={name} reason={result.get('skip_reason')} "
  1169. f"days={result.get('data_days')}"
  1170. )
  1171. skipped_payload = build_wxindex_word_record_skipped_payload(
  1172. meta=meta,
  1173. name=name,
  1174. analyze_ymd=analyze_ymd,
  1175. fetch_start_ymd=fetch_start_ymd,
  1176. fetch_end_ymd=fetch_end_ymd,
  1177. result=result,
  1178. demand_cache_run_id=demand_cache_run_id,
  1179. )
  1180. _persist_wxindex_word_record(
  1181. repository,
  1182. skipped_payload,
  1183. dry_run=dry_run,
  1184. skip_db_save=skip_db_save,
  1185. verbose=verbose,
  1186. action="update skipped",
  1187. )
  1188. export_rows.append(
  1189. {
  1190. "analyze_ymd": analyze_ymd,
  1191. "name": name,
  1192. "meta_id": meta.get("id"),
  1193. "fetch_start_ymd": fetch_start_ymd,
  1194. "fetch_end_ymd": fetch_end_ymd,
  1195. "analysis_skipped": True,
  1196. "skip_reason": result.get("skip_reason"),
  1197. "data_days": result.get("data_days"),
  1198. "retain_reason": "",
  1199. "is_sustained_high": False,
  1200. "is_rising": False,
  1201. "is_spike": False,
  1202. "is_internal_demand_matched": "",
  1203. "matched_demand": "",
  1204. "senior_fit_score": "",
  1205. "is_final_retained": False,
  1206. "min_score": "",
  1207. "max_score": "",
  1208. "avg_score": "",
  1209. }
  1210. )
  1211. continue
  1212. retain_reason = str(result.get("retain_reason") or "").strip() or None
  1213. pending_item = {
  1214. "meta": meta,
  1215. "name": name,
  1216. "fetch_start_ymd": fetch_start_ymd,
  1217. "fetch_end_ymd": fetch_end_ymd,
  1218. "result": result,
  1219. "retain_reason": retain_reason,
  1220. "record_id": item.get("record_id", 0),
  1221. "is_internal_demand_matched": None,
  1222. "matched_demand": None,
  1223. "internal_demand_match_json": None,
  1224. "senior_fit_score": None,
  1225. "demand_senior_fit_json": None,
  1226. "senior_fit_passed": None,
  1227. "is_final_retained": None,
  1228. }
  1229. pending_items.append(pending_item)
  1230. _persist_pending_item_record(
  1231. repository,
  1232. pending_item,
  1233. analyze_ymd=analyze_ymd,
  1234. demand_cache_run_id=demand_cache_run_id,
  1235. phase="heat_analyzed",
  1236. senior_threshold=senior_threshold,
  1237. sustained_threshold=sustained_threshold,
  1238. spike_days=spike_days,
  1239. spike_ratio=spike_ratio,
  1240. spike_baseline_floor=spike_baseline_floor,
  1241. rising_overall_change_rate=rising_overall_change_rate,
  1242. rising_window_change_rate=rising_window_change_rate,
  1243. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1244. dry_run=dry_run,
  1245. skip_db_save=skip_db_save,
  1246. verbose=verbose,
  1247. )
  1248. if verbose:
  1249. print(
  1250. f"heat analyzed word={name} retain_reason={retain_reason or ''} "
  1251. f"data_days={result.get('data_days')}"
  1252. )
  1253. demand_match_results: dict[str, dict[str, Any]] = {}
  1254. pending_by_name = {item["name"]: item for item in pending_items}
  1255. sustained_high_words = [
  1256. item["name"]
  1257. for item in pending_items
  1258. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  1259. and item.get("is_internal_demand_matched") is None
  1260. ]
  1261. for item in pending_items:
  1262. if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
  1263. continue
  1264. if item.get("is_internal_demand_matched") is not None:
  1265. summary["demand_match_resumed"] += 1
  1266. def _persist_demand_match_updates(words: list[str]) -> None:
  1267. for word in words:
  1268. pending_item = pending_by_name.get(word)
  1269. if pending_item is None:
  1270. continue
  1271. match_result = demand_match_results.get(word) or {
  1272. "word": word,
  1273. "matched": False,
  1274. "matched_demand": "",
  1275. "match_list": [],
  1276. }
  1277. _apply_demand_match_to_item(pending_item, match_result)
  1278. _persist_pending_item_record(
  1279. repository,
  1280. pending_item,
  1281. analyze_ymd=analyze_ymd,
  1282. demand_cache_run_id=demand_cache_run_id,
  1283. phase="demand_matched",
  1284. senior_threshold=senior_threshold,
  1285. sustained_threshold=sustained_threshold,
  1286. spike_days=spike_days,
  1287. spike_ratio=spike_ratio,
  1288. spike_baseline_floor=spike_baseline_floor,
  1289. rising_overall_change_rate=rising_overall_change_rate,
  1290. rising_window_change_rate=rising_window_change_rate,
  1291. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1292. dry_run=dry_run,
  1293. skip_db_save=skip_db_save,
  1294. verbose=verbose,
  1295. )
  1296. if sustained_high_words:
  1297. if postprocess_service is not None and demand_name_set:
  1298. for batch in _chunk_words(sustained_high_words):
  1299. summary["demand_match_batches"] += 1
  1300. if verbose:
  1301. print(f"demand match batch size={len(batch)} words={batch}")
  1302. demand_match_results.update(
  1303. postprocess_service.match_words_to_demand_pool(
  1304. words=batch,
  1305. demand_name_set=demand_name_set,
  1306. )
  1307. )
  1308. _persist_demand_match_updates(batch)
  1309. else:
  1310. for word in sustained_high_words:
  1311. demand_match_results[word] = {
  1312. "word": word,
  1313. "matched": False,
  1314. "matched_demand": "",
  1315. "match_list": [],
  1316. "skip_reason": "empty_demand_cache",
  1317. }
  1318. _persist_demand_match_updates(sustained_high_words)
  1319. senior_fit_candidate_names: list[str] = []
  1320. for item in pending_items:
  1321. retain_reason = item.get("retain_reason")
  1322. name = str(item.get("name") or "").strip()
  1323. if not retain_reason or not name:
  1324. continue
  1325. if _is_senior_fit_attempt_done(item, existing_records.get(name)):
  1326. summary["senior_fit_resumed"] += 1
  1327. continue
  1328. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  1329. if not item.get("is_internal_demand_matched"):
  1330. continue
  1331. senior_fit_candidate_names.append(name)
  1332. senior_fit_results: dict[str, dict[str, Any]] = {}
  1333. def _persist_senior_fit_updates(words: list[str]) -> None:
  1334. for word in words:
  1335. pending_item = pending_by_name.get(word)
  1336. senior_result = senior_fit_results.get(word)
  1337. if pending_item is None or senior_result is None:
  1338. continue
  1339. _apply_senior_fit_to_item(pending_item, senior_result)
  1340. _persist_pending_item_record(
  1341. repository,
  1342. pending_item,
  1343. analyze_ymd=analyze_ymd,
  1344. demand_cache_run_id=demand_cache_run_id,
  1345. phase="senior_fit_scored",
  1346. senior_threshold=senior_threshold,
  1347. sustained_threshold=sustained_threshold,
  1348. spike_days=spike_days,
  1349. spike_ratio=spike_ratio,
  1350. spike_baseline_floor=spike_baseline_floor,
  1351. rising_overall_change_rate=rising_overall_change_rate,
  1352. rising_window_change_rate=rising_window_change_rate,
  1353. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1354. dry_run=dry_run,
  1355. skip_db_save=skip_db_save,
  1356. verbose=verbose,
  1357. )
  1358. if verbose and not pending_item.get("senior_fit_passed"):
  1359. print(
  1360. f"senior fit rejected word={word} "
  1361. f"score={pending_item.get('senior_fit_score')}"
  1362. )
  1363. if config is not None and senior_fit_candidate_names:
  1364. for batch in _chunk_words(senior_fit_candidate_names):
  1365. summary["senior_fit_batches"] += 1
  1366. if verbose:
  1367. print(f"senior fit batch size={len(batch)} words={batch}")
  1368. senior_fit_results.update(
  1369. score_wxindex_words_senior_fit(
  1370. words=batch,
  1371. config=config,
  1372. senior_threshold=senior_threshold,
  1373. batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
  1374. )
  1375. )
  1376. _persist_senior_fit_updates(batch)
  1377. stats_saved, stats_resumed = _save_senior_fit_passed_stats(
  1378. repository,
  1379. pending_items,
  1380. analyze_ymd=analyze_ymd,
  1381. existing_stats_names=existing_stats_names,
  1382. dry_run=dry_run,
  1383. skip_db_save=skip_db_save,
  1384. verbose=verbose,
  1385. )
  1386. summary["stats_saved"] = stats_saved
  1387. summary["stats_resumed"] = stats_resumed
  1388. for item in pending_items:
  1389. name = item["name"]
  1390. result = item["result"]
  1391. retain_reason = item.get("retain_reason")
  1392. if retain_reason:
  1393. retained_words.append(
  1394. {
  1395. "name": name,
  1396. "retain_reason": retain_reason,
  1397. "data_days": result.get("data_days"),
  1398. "data_start_ymd": result.get("data_start_ymd"),
  1399. "data_end_ymd": result.get("data_end_ymd"),
  1400. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  1401. "matched_demand": str(item.get("matched_demand") or ""),
  1402. "senior_fit_score": item.get("senior_fit_score"),
  1403. "is_final_retained": bool(item.get("is_final_retained")),
  1404. }
  1405. )
  1406. existing_record = existing_records.get(name)
  1407. if _is_finalized_record(existing_record):
  1408. summary["finalized_resumed"] += 1
  1409. record_id = int(item.get("record_id") or existing_record.get("id") or 0)
  1410. if verbose:
  1411. print(f"resume finalized word={name}")
  1412. else:
  1413. record_id = _persist_pending_item_record(
  1414. repository,
  1415. item,
  1416. analyze_ymd=analyze_ymd,
  1417. demand_cache_run_id=demand_cache_run_id,
  1418. phase="finalized",
  1419. senior_threshold=senior_threshold,
  1420. sustained_threshold=sustained_threshold,
  1421. spike_days=spike_days,
  1422. spike_ratio=spike_ratio,
  1423. spike_baseline_floor=spike_baseline_floor,
  1424. rising_overall_change_rate=rising_overall_change_rate,
  1425. rising_window_change_rate=rising_window_change_rate,
  1426. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1427. dry_run=dry_run,
  1428. skip_db_save=skip_db_save,
  1429. verbose=verbose,
  1430. )
  1431. export_rows.append(
  1432. _build_export_row_from_item(
  1433. item,
  1434. analyze_ymd=analyze_ymd,
  1435. strategy=strategy,
  1436. )
  1437. )
  1438. if item.get("is_final_retained") and strategy:
  1439. final_hive_rows.append(
  1440. build_wxindex_word_hive_row(
  1441. wxindex_word_record_id=record_id,
  1442. word=name,
  1443. strategy=strategy,
  1444. partition_dt=analyze_ymd,
  1445. max_score=result.get("max_score"),
  1446. )
  1447. )
  1448. if (
  1449. final_hive_rows
  1450. and strategy
  1451. and config is not None
  1452. and not dry_run
  1453. and not skip_odps
  1454. ):
  1455. odps_summary = sync_wxindex_word_rows_to_odps(
  1456. config,
  1457. repository,
  1458. hive_rows=final_hive_rows,
  1459. partition_dt=analyze_ymd,
  1460. strategy=strategy,
  1461. )
  1462. summary["odps_written"] = odps_summary.get("written_count", 0)
  1463. summary["odps_synced"] = odps_summary.get("odps_synced", 0)
  1464. summary["odps_sync"] = odps_summary
  1465. elif dry_run or skip_odps:
  1466. summary["odps_written"] = len(final_hive_rows)
  1467. summary["odps_synced"] = len(final_hive_rows)
  1468. _refresh_wxindex_heat_job_summary(
  1469. summary,
  1470. pending_items=pending_items,
  1471. export_rows=export_rows,
  1472. )
  1473. summary["retained_words"] = retained_words
  1474. summary["export_rows"] = export_rows
  1475. return summary
  1476. WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
  1477. "analyze_ymd",
  1478. "name",
  1479. "meta_id",
  1480. "fetch_start_ymd",
  1481. "fetch_end_ymd",
  1482. "data_start_ymd",
  1483. "data_end_ymd",
  1484. "data_days",
  1485. "analysis_skipped",
  1486. "skip_reason",
  1487. "is_sustained_high",
  1488. "is_rising",
  1489. "is_spike",
  1490. "retain_reason",
  1491. "is_internal_demand_matched",
  1492. "matched_demand",
  1493. "senior_fit_score",
  1494. "is_final_retained",
  1495. "min_score",
  1496. "max_score",
  1497. "avg_score",
  1498. "demand_id",
  1499. "weight",
  1500. ]
  1501. def write_wxindex_heat_pattern_csv(
  1502. rows: list[dict[str, Any]],
  1503. output_path: str | Path,
  1504. *,
  1505. fieldnames: list[str] | None = None,
  1506. ) -> Path:
  1507. """将热度分析明细写入本地 CSV。"""
  1508. path = Path(output_path).expanduser()
  1509. if not path.is_absolute():
  1510. path = Path.cwd() / path
  1511. path.parent.mkdir(parents=True, exist_ok=True)
  1512. columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
  1513. with path.open("w", encoding="utf-8-sig", newline="") as handle:
  1514. writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
  1515. writer.writeheader()
  1516. for row in rows:
  1517. writer.writerow({column: row.get(column, "") for column in columns})
  1518. return path