wxindex_heat_pattern.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665
  1. """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
  2. from __future__ import annotations
  3. import csv
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. from app.hot_content.client import JsonApiClient
  9. from app.hot_content.demand_cache_service import DemandCacheService
  10. from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
  11. from app.hot_content.demand_quality import (
  12. TYPE_PHRASE,
  13. llm_score_senior_fit,
  14. lookup_quality_scores,
  15. )
  16. from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
  17. from app.hot_content.exceptions import HotContentFlowError
  18. from app.hot_content.postprocess_service import ContributionPostprocessService
  19. from app.hot_content.repository import HotContentRepository
  20. from app.hot_content.timezone import SHANGHAI_TZ
  21. from app.hot_content.types import FlowConfig
  22. from app.hot_content.wxindex_trend import (
  23. HEAT_RISING_ADJACENT_UP_RATIO,
  24. HEAT_RISING_OVERALL_CHANGE_RATE,
  25. HEAT_RISING_WINDOW_CHANGE_RATE,
  26. HEAT_SPIKE_BASELINE_FLOOR,
  27. HEAT_SPIKE_RATIO,
  28. extract_sorted_scores,
  29. is_wxindex_heat_rising_scores,
  30. is_wxindex_spike_scores,
  31. )
  32. from app.hot_content.wxindex_words import filter_scores_in_ymd_window
  33. WXINDEX_HEAT_MIN_DAYS = 7
  34. WXINDEX_HEAT_MAX_DAYS = 14
  35. WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
  36. WXINDEX_SPIKE_LOOKBACK_DAYS = 3
  37. WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
  38. WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
  39. WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
  40. WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
  41. WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
  42. WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
  43. WXINDEX_WORD_LLM_BATCH_SIZE = 10
  44. PATTERN_SUSTAINED_HIGH = "sustained_high"
  45. PATTERN_RISING = "rising"
  46. PATTERN_SPIKE = "spike"
  47. RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
  48. RETAIN_REASON_RISING = "热度持续上涨"
  49. RETAIN_REASON_SPIKE = "热度突然暴涨"
  50. PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
  51. PHASE_HEAT_ANALYZED = "heat_analyzed"
  52. PHASE_DEMAND_MATCHED = "demand_matched"
  53. PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
  54. PHASE_FINALIZED = "finalized"
  55. _HEAT_DONE_PHASES = frozenset(
  56. {
  57. PHASE_ANALYSIS_SKIPPED,
  58. PHASE_HEAT_ANALYZED,
  59. PHASE_DEMAND_MATCHED,
  60. PHASE_SENIOR_FIT_SCORED,
  61. PHASE_FINALIZED,
  62. }
  63. )
  64. _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
  65. "data_start_ymd": None,
  66. "data_end_ymd": None,
  67. "data_days": None,
  68. "is_sustained_high": None,
  69. "is_rising": None,
  70. "is_spike": None,
  71. "retain_reason": None,
  72. "is_internal_demand_matched": None,
  73. "matched_demand": None,
  74. "internal_demand_match_json": None,
  75. "senior_fit_score": None,
  76. "demand_senior_fit_json": None,
  77. "is_final_retained": None,
  78. "min_score": None,
  79. "max_score": None,
  80. "avg_score": None,
  81. "detail_json": None,
  82. }
  83. def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
  84. detail = record.get("detail_json")
  85. if isinstance(detail, dict):
  86. return detail
  87. if isinstance(detail, str) and detail.strip():
  88. try:
  89. parsed = json.loads(detail)
  90. return parsed if isinstance(parsed, dict) else {}
  91. except json.JSONDecodeError:
  92. return {}
  93. return {}
  94. def _record_phase(record: dict[str, Any]) -> str:
  95. return str(_parse_record_detail_json(record).get("phase") or "").strip()
  96. def _nullable_bool_from_record(value: Any) -> bool | None:
  97. if value is None:
  98. return None
  99. return bool(value)
  100. def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
  101. value = record.get(field)
  102. if value is None:
  103. return None
  104. if isinstance(value, (dict, list)):
  105. return value
  106. if isinstance(value, (bytes, bytearray)):
  107. value = value.decode("utf-8")
  108. if isinstance(value, str):
  109. try:
  110. return json.loads(value)
  111. except json.JSONDecodeError:
  112. return None
  113. return value
  114. def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
  115. if not record:
  116. return False
  117. return _record_phase(record) in _HEAT_DONE_PHASES
  118. def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
  119. return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
  120. def _is_senior_fit_needed(
  121. *,
  122. retain_reason: str | None,
  123. is_internal_demand_matched: bool | None,
  124. ) -> bool:
  125. if not retain_reason:
  126. return False
  127. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  128. return bool(is_internal_demand_matched)
  129. return True
  130. def _is_finalized_record(record: dict[str, Any] | None) -> bool:
  131. if not record:
  132. return False
  133. return _record_phase(record) == PHASE_FINALIZED
  134. def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
  135. ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
  136. return max(ymds) if ymds else None
  137. def _should_rerun_heat_analysis(
  138. existing_record: dict[str, Any] | None,
  139. *,
  140. scores: list[dict[str, Any]],
  141. fetch_start_ymd: str,
  142. fetch_end_ymd: str,
  143. min_days: int,
  144. max_days: int,
  145. ) -> bool:
  146. if not _is_heat_analysis_done(existing_record):
  147. return True
  148. assert existing_record is not None
  149. latest_ymd = _latest_score_ymd(scores)
  150. record_end = str(existing_record.get("data_end_ymd") or "").strip()
  151. if latest_ymd and record_end and latest_ymd > record_end:
  152. return True
  153. if _is_analysis_skipped_record(existing_record):
  154. _, skip_reason = prepare_analysis_scores(
  155. scores,
  156. start_ymd=fetch_start_ymd,
  157. end_ymd=fetch_end_ymd,
  158. min_days=min_days,
  159. max_days=max_days,
  160. )
  161. if skip_reason is None:
  162. return True
  163. return False
  164. def _is_senior_fit_attempt_done(
  165. item: dict[str, Any],
  166. existing_record: dict[str, Any] | None,
  167. ) -> bool:
  168. if existing_record:
  169. phase = _record_phase(existing_record)
  170. if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
  171. return True
  172. return item.get("senior_fit_score") is not None
  173. def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
  174. if score is None:
  175. return False
  176. try:
  177. return float(score) / 10.0 > senior_threshold
  178. except (TypeError, ValueError):
  179. return False
  180. def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
  181. detail = _parse_record_detail_json(record)
  182. retain_reason_raw = record.get("retain_reason")
  183. retain_reason = (
  184. str(retain_reason_raw).strip() if retain_reason_raw is not None else None
  185. ) or None
  186. def _bool_field(key: str) -> bool | None:
  187. value = record.get(key)
  188. if value is None:
  189. return None
  190. return bool(value)
  191. return {
  192. "skipped": False,
  193. "skip_reason": None,
  194. "data_days": record.get("data_days"),
  195. "data_start_ymd": record.get("data_start_ymd"),
  196. "data_end_ymd": record.get("data_end_ymd"),
  197. "fetch_start_ymd": record.get("fetch_start_ymd"),
  198. "fetch_end_ymd": record.get("fetch_end_ymd"),
  199. "min_score": record.get("min_score"),
  200. "max_score": record.get("max_score"),
  201. "avg_score": record.get("avg_score"),
  202. "is_sustained_high": _bool_field("is_sustained_high"),
  203. "is_rising": _bool_field("is_rising"),
  204. "is_spike": _bool_field("is_spike"),
  205. "retain_reason": retain_reason,
  206. "patterns": list(detail.get("patterns") or []),
  207. }
  208. def _rehydrate_pending_item_from_record(
  209. item: dict[str, Any],
  210. record: dict[str, Any],
  211. *,
  212. senior_threshold: float,
  213. ) -> dict[str, Any]:
  214. result = _rehydrate_result_from_record(record)
  215. retain_reason = result.get("retain_reason")
  216. is_internal_demand_matched = _nullable_bool_from_record(
  217. record.get("is_internal_demand_matched")
  218. )
  219. matched_demand_raw = record.get("matched_demand")
  220. matched_demand = (
  221. str(matched_demand_raw).strip() if matched_demand_raw is not None else None
  222. ) or None
  223. senior_fit_score = record.get("senior_fit_score")
  224. senior_fit_passed: bool | None = None
  225. is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
  226. if senior_fit_score is not None:
  227. senior_fit_passed = _senior_fit_passed_from_score(
  228. senior_fit_score,
  229. senior_threshold=senior_threshold,
  230. )
  231. elif not _is_senior_fit_needed(
  232. retain_reason=retain_reason,
  233. is_internal_demand_matched=is_internal_demand_matched,
  234. ):
  235. senior_fit_passed = False
  236. if is_final_retained is None:
  237. is_final_retained = False
  238. return {
  239. "meta": item["meta"],
  240. "name": item["name"],
  241. "fetch_start_ymd": item["fetch_start_ymd"],
  242. "fetch_end_ymd": item["fetch_end_ymd"],
  243. "result": result,
  244. "retain_reason": retain_reason,
  245. "record_id": int(record.get("id") or item.get("record_id") or 0),
  246. "is_internal_demand_matched": is_internal_demand_matched,
  247. "matched_demand": matched_demand,
  248. "internal_demand_match_json": _parse_record_json_field(
  249. record,
  250. "internal_demand_match_json",
  251. ),
  252. "senior_fit_score": senior_fit_score,
  253. "demand_senior_fit_json": _parse_record_json_field(
  254. record,
  255. "demand_senior_fit_json",
  256. ),
  257. "senior_fit_passed": senior_fit_passed,
  258. "is_final_retained": is_final_retained,
  259. }
  260. def _build_skipped_export_row_from_record(
  261. *,
  262. record: dict[str, Any],
  263. meta: dict[str, Any],
  264. name: str,
  265. analyze_ymd: str,
  266. fetch_start_ymd: str,
  267. fetch_end_ymd: str,
  268. ) -> dict[str, Any]:
  269. detail = _parse_record_detail_json(record)
  270. return {
  271. "analyze_ymd": analyze_ymd,
  272. "name": name,
  273. "meta_id": meta.get("id"),
  274. "fetch_start_ymd": fetch_start_ymd,
  275. "fetch_end_ymd": fetch_end_ymd,
  276. "analysis_skipped": True,
  277. "skip_reason": detail.get("skip_reason") or "",
  278. "data_days": record.get("data_days") or "",
  279. "retain_reason": "",
  280. "is_sustained_high": False,
  281. "is_rising": False,
  282. "is_spike": False,
  283. "is_internal_demand_matched": "",
  284. "matched_demand": "",
  285. "senior_fit_score": "",
  286. "is_final_retained": False,
  287. "min_score": "",
  288. "max_score": "",
  289. "avg_score": "",
  290. }
  291. def _refresh_wxindex_heat_job_summary(
  292. summary: dict[str, Any],
  293. *,
  294. pending_items: list[dict[str, Any]],
  295. export_rows: list[dict[str, Any]],
  296. ) -> None:
  297. summary["analyzed"] = len(pending_items)
  298. summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
  299. summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
  300. summary["sustained_high"] = sum(
  301. 1
  302. for item in pending_items
  303. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  304. )
  305. summary["rising"] = sum(
  306. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
  307. )
  308. summary["spike"] = sum(
  309. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
  310. )
  311. summary["internal_demand_matched"] = sum(
  312. 1 for item in pending_items if item.get("is_internal_demand_matched")
  313. )
  314. summary["senior_fit_candidates"] = sum(
  315. 1
  316. for item in pending_items
  317. if _is_senior_fit_needed(
  318. retain_reason=item.get("retain_reason"),
  319. is_internal_demand_matched=item.get("is_internal_demand_matched"),
  320. )
  321. )
  322. summary["senior_scored"] = sum(
  323. 1 for item in pending_items if item.get("senior_fit_score") is not None
  324. )
  325. summary["senior_fit_passed"] = sum(
  326. 1 for item in pending_items if item.get("senior_fit_passed")
  327. )
  328. summary["final_retained"] = sum(
  329. 1 for item in pending_items if item.get("is_final_retained")
  330. )
  331. def resolve_retain_reason(
  332. *,
  333. is_sustained_high: bool,
  334. is_rising: bool,
  335. is_spike: bool,
  336. ) -> str | None:
  337. """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
  338. if is_rising:
  339. return RETAIN_REASON_RISING
  340. if is_spike:
  341. return RETAIN_REASON_SPIKE
  342. if is_sustained_high:
  343. return RETAIN_REASON_SUSTAINED_HIGH
  344. return None
  345. def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
  346. size = max(batch_size, 1)
  347. return [words[index : index + size] for index in range(0, len(words), size)]
  348. def _empty_senior_fit_result() -> dict[str, Any]:
  349. return {
  350. "senior_fit_score": None,
  351. "demand_senior_fit_json": {"source": "", "items": []},
  352. "passed": False,
  353. }
  354. def score_wxindex_words_senior_fit(
  355. *,
  356. words: list[str],
  357. config: FlowConfig,
  358. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  359. batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
  360. ) -> dict[str, dict[str, Any]]:
  361. """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
  362. cleaned: list[str] = []
  363. seen: set[str] = set()
  364. for raw in words:
  365. word = str(raw or "").strip()
  366. if not word or word in seen:
  367. continue
  368. seen.add(word)
  369. cleaned.append(word)
  370. results: dict[str, dict[str, Any]] = {
  371. word: _empty_senior_fit_result() for word in cleaned
  372. }
  373. for batch in _chunk_words(cleaned, batch_size=batch_size):
  374. candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
  375. senior_fit_json = llm_score_senior_fit(
  376. channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
  377. candidates=candidates,
  378. model=config.demand_quality_llm_model,
  379. max_attempts=config.demand_quality_llm_max_attempts,
  380. retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
  381. max_tokens=config.demand_quality_llm_max_tokens,
  382. )
  383. senior_fit_json["threshold"] = senior_threshold * 10.0
  384. for word in batch:
  385. _, senior_score = lookup_quality_scores(
  386. demand_type=TYPE_PHRASE,
  387. demand_text=word,
  388. event_sense_json=None,
  389. senior_fit_json=senior_fit_json,
  390. )
  391. passed = (
  392. senior_score is not None
  393. and senior_score / 10.0 > senior_threshold
  394. )
  395. results[word] = {
  396. "senior_fit_score": senior_score,
  397. "demand_senior_fit_json": senior_fit_json,
  398. "passed": passed,
  399. }
  400. return results
  401. def score_wxindex_word_senior_fit(
  402. *,
  403. word: str,
  404. config: FlowConfig,
  405. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  406. ) -> dict[str, Any]:
  407. """对单个微信指数热词执行老年性 LLM 评分。"""
  408. target_word = str(word or "").strip()
  409. if not target_word:
  410. return _empty_senior_fit_result()
  411. batch_results = score_wxindex_words_senior_fit(
  412. words=[target_word],
  413. config=config,
  414. senior_threshold=senior_threshold,
  415. batch_size=1,
  416. )
  417. return batch_results.get(target_word, _empty_senior_fit_result())
  418. def build_wxindex_odps_extend(retain_reason: str | None) -> str:
  419. method = str(retain_reason or "").strip()
  420. if not method:
  421. return "{}"
  422. return json.dumps({"method": method}, ensure_ascii=False)
  423. def build_wxindex_word_hive_row(
  424. *,
  425. wxindex_word_record_id: int,
  426. word: str,
  427. strategy: str,
  428. partition_dt: str,
  429. max_score: float | None,
  430. retain_reason: str | None = None,
  431. ) -> dict[str, Any]:
  432. normalized_name = str(word or "").strip()
  433. weight = 0.0
  434. if max_score is not None:
  435. weight = float(max_score) / WEIGHT_DIVISOR
  436. return {
  437. "record_id": wxindex_word_record_id,
  438. "strategy": strategy,
  439. "demand_id": build_demand_id(
  440. strategy=strategy,
  441. demand_name=normalized_name,
  442. partition_dt=partition_dt,
  443. ),
  444. "demand_name": normalized_name,
  445. "weight": weight,
  446. "type": TYPE_PHRASE,
  447. "video_count": None,
  448. "video_list": [],
  449. "extend": build_wxindex_odps_extend(retain_reason),
  450. "dt": partition_dt,
  451. }
  452. def build_wxindex_word_odps_sync_row(
  453. *,
  454. wxindex_word_record_id: int,
  455. word: str,
  456. strategy: str,
  457. partition_dt: str,
  458. max_score: float | None,
  459. retain_reason: str | None = None,
  460. ) -> dict[str, Any]:
  461. normalized_name = str(word or "").strip()
  462. weight = None
  463. if max_score is not None:
  464. weight = float(max_score) / WEIGHT_DIVISOR
  465. return {
  466. "partition_dt": partition_dt,
  467. "strategy": strategy,
  468. "demand_id": build_demand_id(
  469. strategy=strategy,
  470. demand_name=normalized_name,
  471. partition_dt=partition_dt,
  472. ),
  473. "demand_name": normalized_name,
  474. "demand_type": TYPE_PHRASE,
  475. "record_id": wxindex_word_record_id,
  476. "weight": weight,
  477. "extend": build_wxindex_odps_extend(retain_reason),
  478. }
  479. def prepare_analysis_scores(
  480. scores: list[dict[str, Any]],
  481. *,
  482. start_ymd: str,
  483. end_ymd: str,
  484. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  485. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  486. ) -> tuple[list[dict[str, Any]], str | None]:
  487. """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
  488. window_scores = filter_scores_in_ymd_window(
  489. scores,
  490. start_ymd=start_ymd,
  491. end_ymd=end_ymd,
  492. )
  493. if len(window_scores) > max_days:
  494. window_scores = window_scores[-max_days:]
  495. if len(window_scores) < min_days:
  496. return window_scores, "insufficient_days"
  497. return window_scores, None
  498. def _score_row_ymd(row: dict[str, Any]) -> str:
  499. return str(row.get("ymd") or row.get("dt") or "").strip()
  500. def _window_ymd_bounds(
  501. window_scores: list[dict[str, Any]],
  502. ) -> tuple[str | None, str | None]:
  503. if not window_scores:
  504. return None, None
  505. start_ymd = _score_row_ymd(window_scores[0]) or None
  506. end_ymd = _score_row_ymd(window_scores[-1]) or None
  507. return start_ymd, end_ymd
  508. def analyze_wxindex_heat_patterns(
  509. scores: list[dict[str, Any]],
  510. *,
  511. start_ymd: str,
  512. end_ymd: str,
  513. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  514. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  515. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  516. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  517. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  518. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  519. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  520. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  521. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  522. ) -> dict[str, Any]:
  523. """对目标区间数据判断三种热度模式。"""
  524. window_scores, skip_reason = prepare_analysis_scores(
  525. scores,
  526. start_ymd=start_ymd,
  527. end_ymd=end_ymd,
  528. min_days=min_days,
  529. max_days=max_days,
  530. )
  531. if skip_reason:
  532. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  533. return {
  534. "skipped": True,
  535. "skip_reason": skip_reason,
  536. "data_days": len(window_scores) if window_scores else None,
  537. "data_start_ymd": data_start_ymd,
  538. "data_end_ymd": data_end_ymd,
  539. "patterns": [],
  540. }
  541. numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
  542. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  543. is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
  544. is_rising = is_wxindex_heat_rising_scores(
  545. numeric_scores,
  546. min_points=min_days,
  547. overall_change_rate=rising_overall_change_rate,
  548. window_change_rate_threshold=rising_window_change_rate,
  549. adjacent_up_ratio=rising_adjacent_up_ratio,
  550. )
  551. is_spike = is_wxindex_spike_scores(
  552. numeric_scores,
  553. spike_days=spike_days,
  554. min_points=min_days,
  555. spike_ratio=spike_ratio,
  556. baseline_floor=spike_baseline_floor,
  557. )
  558. retain_reason = resolve_retain_reason(
  559. is_sustained_high=is_sustained_high,
  560. is_rising=is_rising,
  561. is_spike=is_spike,
  562. )
  563. patterns: list[str] = []
  564. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  565. patterns.append(PATTERN_SUSTAINED_HIGH)
  566. elif retain_reason == RETAIN_REASON_RISING:
  567. patterns.append(PATTERN_RISING)
  568. elif retain_reason == RETAIN_REASON_SPIKE:
  569. patterns.append(PATTERN_SPIKE)
  570. return {
  571. "skipped": False,
  572. "skip_reason": None,
  573. "data_days": len(window_scores),
  574. "data_start_ymd": data_start_ymd,
  575. "data_end_ymd": data_end_ymd,
  576. "fetch_start_ymd": start_ymd,
  577. "fetch_end_ymd": end_ymd,
  578. "min_score": min(numeric_scores),
  579. "max_score": max(numeric_scores),
  580. "avg_score": sum(numeric_scores) / len(numeric_scores),
  581. "is_sustained_high": is_sustained_high,
  582. "is_rising": is_rising,
  583. "is_spike": is_spike,
  584. "retain_reason": retain_reason,
  585. "patterns": patterns,
  586. "scores": window_scores,
  587. }
  588. def build_wxindex_word_record_init_payload(
  589. *,
  590. meta: dict[str, Any],
  591. name: str,
  592. analyze_ymd: str,
  593. fetch_start_ymd: str,
  594. fetch_end_ymd: str,
  595. demand_cache_run_id: int | None = None,
  596. ) -> dict[str, Any]:
  597. """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
  598. return {
  599. "name": name,
  600. "meta_id": meta.get("id"),
  601. "analyze_ymd": analyze_ymd,
  602. "fetch_start_ymd": fetch_start_ymd,
  603. "fetch_end_ymd": fetch_end_ymd,
  604. "demand_cache_run_id": demand_cache_run_id,
  605. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  606. }
  607. def build_wxindex_word_record_skipped_payload(
  608. *,
  609. meta: dict[str, Any],
  610. name: str,
  611. analyze_ymd: str,
  612. fetch_start_ymd: str,
  613. fetch_end_ymd: str,
  614. result: dict[str, Any],
  615. demand_cache_run_id: int | None = None,
  616. ) -> dict[str, Any]:
  617. """分析被跳过时更新追溯记录。"""
  618. payload = {
  619. "name": name,
  620. "meta_id": meta.get("id"),
  621. "analyze_ymd": analyze_ymd,
  622. "fetch_start_ymd": fetch_start_ymd,
  623. "fetch_end_ymd": fetch_end_ymd,
  624. "demand_cache_run_id": demand_cache_run_id,
  625. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  626. }
  627. payload.update(
  628. {
  629. "data_start_ymd": result.get("data_start_ymd"),
  630. "data_end_ymd": result.get("data_end_ymd"),
  631. "data_days": result.get("data_days"),
  632. "detail_json": {
  633. "phase": "analysis_skipped",
  634. "skip_reason": result.get("skip_reason"),
  635. "data_days": result.get("data_days"),
  636. },
  637. }
  638. )
  639. return payload
  640. def build_wxindex_word_record_payload(
  641. item: dict[str, Any],
  642. *,
  643. analyze_ymd: str,
  644. demand_cache_run_id: int | None,
  645. phase: str,
  646. senior_threshold: float,
  647. sustained_threshold: float,
  648. spike_days: int,
  649. spike_ratio: float,
  650. spike_baseline_floor: float,
  651. rising_overall_change_rate: float,
  652. rising_window_change_rate: float,
  653. rising_adjacent_up_ratio: float,
  654. ) -> dict[str, Any]:
  655. """根据 pending item 当前状态构建 records 表 payload。"""
  656. meta = item["meta"]
  657. name = item["name"]
  658. fetch_start_ymd = item["fetch_start_ymd"]
  659. fetch_end_ymd = item["fetch_end_ymd"]
  660. result = item["result"]
  661. retain_reason = item.get("retain_reason")
  662. return {
  663. "name": name,
  664. "meta_id": meta.get("id"),
  665. "analyze_ymd": analyze_ymd,
  666. "fetch_start_ymd": fetch_start_ymd,
  667. "fetch_end_ymd": fetch_end_ymd,
  668. "data_start_ymd": result.get("data_start_ymd"),
  669. "data_end_ymd": result.get("data_end_ymd"),
  670. "data_days": result.get("data_days"),
  671. "is_sustained_high": result.get("is_sustained_high"),
  672. "is_rising": result.get("is_rising"),
  673. "is_spike": result.get("is_spike"),
  674. "retain_reason": retain_reason,
  675. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  676. "matched_demand": item.get("matched_demand"),
  677. "demand_cache_run_id": demand_cache_run_id,
  678. "internal_demand_match_json": item.get("internal_demand_match_json"),
  679. "senior_fit_score": item.get("senior_fit_score"),
  680. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  681. "is_final_retained": item.get("is_final_retained"),
  682. "min_score": result.get("min_score"),
  683. "max_score": result.get("max_score"),
  684. "avg_score": result.get("avg_score"),
  685. "detail_json": {
  686. "phase": phase,
  687. "patterns": list(result.get("patterns") or []),
  688. "retain_reason": retain_reason,
  689. "retain_reason_priority": "2->3->1",
  690. "senior_fit_threshold": senior_threshold,
  691. "senior_fit_threshold_score": senior_threshold * 10.0,
  692. "is_final_retained": item.get("is_final_retained"),
  693. "sustained_threshold": sustained_threshold,
  694. "spike_days": spike_days,
  695. "spike_ratio": spike_ratio,
  696. "spike_baseline_floor": spike_baseline_floor,
  697. "rising_overall_change_rate": rising_overall_change_rate,
  698. "rising_window_change_rate": rising_window_change_rate,
  699. "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
  700. },
  701. }
  702. def _persist_wxindex_word_record(
  703. repository: HotContentRepository,
  704. payload: dict[str, Any],
  705. *,
  706. dry_run: bool,
  707. skip_db_save: bool,
  708. verbose: bool,
  709. action: str,
  710. ) -> int:
  711. name = str(payload.get("name") or "").strip()
  712. if dry_run or skip_db_save:
  713. if verbose:
  714. label = "dry-run" if dry_run else "skip-db-save"
  715. print(f"[{label}] would {action} wxindex word record word={name}")
  716. return 0
  717. return repository.save_wxindex_word_record(payload)
  718. def _persist_pending_item_record(
  719. repository: HotContentRepository,
  720. item: dict[str, Any],
  721. *,
  722. analyze_ymd: str,
  723. demand_cache_run_id: int | None,
  724. phase: str,
  725. senior_threshold: float,
  726. sustained_threshold: float,
  727. spike_days: int,
  728. spike_ratio: float,
  729. spike_baseline_floor: float,
  730. rising_overall_change_rate: float,
  731. rising_window_change_rate: float,
  732. rising_adjacent_up_ratio: float,
  733. dry_run: bool,
  734. skip_db_save: bool,
  735. verbose: bool,
  736. ) -> int:
  737. payload = build_wxindex_word_record_payload(
  738. item,
  739. analyze_ymd=analyze_ymd,
  740. demand_cache_run_id=demand_cache_run_id,
  741. phase=phase,
  742. senior_threshold=senior_threshold,
  743. sustained_threshold=sustained_threshold,
  744. spike_days=spike_days,
  745. spike_ratio=spike_ratio,
  746. spike_baseline_floor=spike_baseline_floor,
  747. rising_overall_change_rate=rising_overall_change_rate,
  748. rising_window_change_rate=rising_window_change_rate,
  749. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  750. )
  751. record_id = _persist_wxindex_word_record(
  752. repository,
  753. payload,
  754. dry_run=dry_run,
  755. skip_db_save=skip_db_save,
  756. verbose=verbose,
  757. action=f"{phase} word={item['name']}",
  758. )
  759. if record_id:
  760. item["record_id"] = record_id
  761. return int(item.get("record_id") or 0)
  762. def _filter_candidates_awaiting_yesterday_score(
  763. repository: HotContentRepository,
  764. candidate_items: list[dict[str, Any]],
  765. *,
  766. yesterday_ymd: str,
  767. existing_records: dict[str, dict[str, Any]],
  768. verbose: bool,
  769. ) -> tuple[list[dict[str, Any]], int]:
  770. """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
  771. names_to_check = [
  772. item["name"]
  773. for item in candidate_items
  774. if not _is_heat_analysis_done(existing_records.get(item["name"]))
  775. ]
  776. if not names_to_check:
  777. return candidate_items, 0
  778. names_with_yesterday = repository.list_wxindex_word_names_with_dt(
  779. names_to_check,
  780. dt=yesterday_ymd,
  781. )
  782. ready_items: list[dict[str, Any]] = []
  783. awaiting_count = 0
  784. for item in candidate_items:
  785. name = item["name"]
  786. if _is_heat_analysis_done(existing_records.get(name)):
  787. ready_items.append(item)
  788. continue
  789. if name in names_with_yesterday:
  790. ready_items.append(item)
  791. continue
  792. awaiting_count += 1
  793. if verbose:
  794. print(
  795. f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
  796. )
  797. return ready_items, awaiting_count
  798. def _init_candidate_wxindex_word_records(
  799. repository: HotContentRepository,
  800. candidate_items: list[dict[str, Any]],
  801. *,
  802. analyze_ymd: str,
  803. demand_cache_run_id: int | None,
  804. dry_run: bool,
  805. skip_db_save: bool,
  806. verbose: bool,
  807. ) -> None:
  808. if not candidate_items:
  809. return
  810. if dry_run or skip_db_save:
  811. if verbose:
  812. label = "dry-run" if dry_run else "skip-db-save"
  813. print(
  814. f"[{label}] would batch init wxindex word records "
  815. f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
  816. )
  817. return
  818. init_payloads = [
  819. build_wxindex_word_record_init_payload(
  820. meta=item["meta"],
  821. name=item["name"],
  822. analyze_ymd=analyze_ymd,
  823. fetch_start_ymd=item["fetch_start_ymd"],
  824. fetch_end_ymd=item["fetch_end_ymd"],
  825. demand_cache_run_id=demand_cache_run_id,
  826. )
  827. for item in candidate_items
  828. ]
  829. record_id_map = repository.init_wxindex_word_records(init_payloads)
  830. for item in candidate_items:
  831. item["record_id"] = int(record_id_map.get(item["name"]) or 0)
  832. if verbose:
  833. print(
  834. f"init wxindex word records count={len(candidate_items)} "
  835. f"analyze_ymd={analyze_ymd}"
  836. )
  837. def _apply_demand_match_to_item(
  838. item: dict[str, Any],
  839. match_result: dict[str, Any],
  840. ) -> None:
  841. item["internal_demand_match_json"] = match_result
  842. item["is_internal_demand_matched"] = bool(match_result.get("matched"))
  843. matched_demand = str(match_result.get("matched_demand") or "").strip()
  844. item["matched_demand"] = matched_demand or None
  845. if not item.get("is_internal_demand_matched"):
  846. item["is_final_retained"] = False
  847. def _apply_senior_fit_to_item(
  848. item: dict[str, Any],
  849. senior_result: dict[str, Any],
  850. ) -> None:
  851. passed = bool(senior_result.get("passed"))
  852. item["senior_fit_score"] = senior_result.get("senior_fit_score")
  853. item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
  854. item["senior_fit_passed"] = passed
  855. item["is_final_retained"] = passed
  856. def build_wxindex_word_stats_payload(
  857. item: dict[str, Any],
  858. *,
  859. analyze_ymd: str,
  860. ) -> dict[str, Any]:
  861. """构建通过热度+老年性筛选的词统计 payload。"""
  862. meta = item["meta"]
  863. result = item["result"]
  864. return {
  865. "name": item["name"],
  866. "meta_id": meta.get("id"),
  867. "analyze_ymd": analyze_ymd,
  868. "wxindex_word_record_id": item.get("record_id"),
  869. "retain_reason": item.get("retain_reason"),
  870. "senior_fit_score": item.get("senior_fit_score"),
  871. "data_start_ymd": result.get("data_start_ymd"),
  872. "data_end_ymd": result.get("data_end_ymd"),
  873. "data_days": result.get("data_days"),
  874. "min_score": result.get("min_score"),
  875. "max_score": result.get("max_score"),
  876. "avg_score": result.get("avg_score"),
  877. "detail_json": {
  878. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  879. "is_sustained_high": result.get("is_sustained_high"),
  880. "is_rising": result.get("is_rising"),
  881. "is_spike": result.get("is_spike"),
  882. },
  883. }
  884. def _save_senior_fit_passed_stats(
  885. repository: HotContentRepository,
  886. items: list[dict[str, Any]],
  887. *,
  888. analyze_ymd: str,
  889. existing_stats_names: set[str] | None = None,
  890. dry_run: bool,
  891. skip_db_save: bool,
  892. verbose: bool,
  893. ) -> tuple[int, int]:
  894. existing = existing_stats_names or set()
  895. all_payloads = [
  896. build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
  897. for item in items
  898. if item.get("senior_fit_passed")
  899. ]
  900. resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
  901. payloads = [payload for payload in all_payloads if payload["name"] not in existing]
  902. if not payloads:
  903. return 0, resumed
  904. if dry_run or skip_db_save:
  905. if verbose:
  906. label = "dry-run" if dry_run else "skip-db-save"
  907. print(f"[{label}] would save wxindex word stats count={len(payloads)}")
  908. return len(payloads), resumed
  909. saved = repository.save_wxindex_word_stats_batch(payloads)
  910. if verbose:
  911. print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
  912. return saved, resumed
  913. def _build_export_row_from_item(
  914. item: dict[str, Any],
  915. *,
  916. analyze_ymd: str,
  917. strategy: str,
  918. ) -> dict[str, Any]:
  919. meta = item["meta"]
  920. name = item["name"]
  921. result = item["result"]
  922. retain_reason = item.get("retain_reason")
  923. is_internal_demand_matched = item.get("is_internal_demand_matched")
  924. matched_demand = str(item.get("matched_demand") or "")
  925. senior_fit_score = item.get("senior_fit_score")
  926. is_final_retained = bool(item.get("is_final_retained"))
  927. return {
  928. "analyze_ymd": analyze_ymd,
  929. "name": name,
  930. "meta_id": meta.get("id"),
  931. "fetch_start_ymd": item["fetch_start_ymd"],
  932. "fetch_end_ymd": item["fetch_end_ymd"],
  933. "data_start_ymd": result.get("data_start_ymd"),
  934. "data_end_ymd": result.get("data_end_ymd"),
  935. "data_days": result.get("data_days"),
  936. "analysis_skipped": False,
  937. "skip_reason": "",
  938. "is_sustained_high": bool(result.get("is_sustained_high")),
  939. "is_rising": bool(result.get("is_rising")),
  940. "is_spike": bool(result.get("is_spike")),
  941. "retain_reason": retain_reason or "",
  942. "is_internal_demand_matched": (
  943. "" if is_internal_demand_matched is None else is_internal_demand_matched
  944. ),
  945. "matched_demand": matched_demand,
  946. "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
  947. "is_final_retained": is_final_retained,
  948. "min_score": result.get("min_score"),
  949. "max_score": result.get("max_score"),
  950. "avg_score": result.get("avg_score"),
  951. "demand_id": (
  952. build_demand_id(
  953. strategy=strategy,
  954. demand_name=name,
  955. partition_dt=analyze_ymd,
  956. )
  957. if is_final_retained and strategy
  958. else ""
  959. ),
  960. "weight": (
  961. float(result.get("max_score") or 0) / WEIGHT_DIVISOR
  962. if is_final_retained and result.get("max_score") is not None
  963. else ""
  964. ),
  965. }
  966. def run_wxindex_heat_pattern_daily_job(
  967. repository: HotContentRepository,
  968. *,
  969. config: FlowConfig | None = None,
  970. api_client: JsonApiClient | None = None,
  971. today: date | None = None,
  972. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  973. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  974. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  975. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  976. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  977. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  978. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  979. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  980. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  981. dry_run: bool = False,
  982. skip_odps: bool = False,
  983. skip_db_save: bool = False,
  984. verbose: bool = False,
  985. ) -> dict[str, Any]:
  986. """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
  987. current = today or datetime.now(SHANGHAI_TZ).date()
  988. analyze_ymd = current.strftime("%Y%m%d")
  989. meta_rows = repository.list_active_wxindex_word_meta(today=current)
  990. demand_name_set: list[str] = []
  991. demand_cache_run_id: int | None = None
  992. demand_cache_error: str | None = None
  993. postprocess_service: ContributionPostprocessService | None = None
  994. if config is not None:
  995. try:
  996. cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
  997. demand_name_set = list(cache.get("demand_name_set") or [])
  998. demand_cache_run_id = int(cache["id"])
  999. if demand_name_set:
  1000. postprocess_service = ContributionPostprocessService(
  1001. config,
  1002. repository,
  1003. api_client
  1004. or JsonApiClient(
  1005. timeout_seconds=config.request_timeout_seconds,
  1006. verify_ssl=config.https_verify_ssl,
  1007. ),
  1008. )
  1009. except HotContentFlowError as exc:
  1010. demand_cache_error = str(exc)
  1011. if verbose:
  1012. print(f"demand cache unavailable, skip demand match: {exc}")
  1013. summary: dict[str, Any] = {
  1014. "analyze_ymd": analyze_ymd,
  1015. "meta_count": len(meta_rows),
  1016. "analyzed": 0,
  1017. "skipped": 0,
  1018. "retained": 0,
  1019. "sustained_high": 0,
  1020. "rising": 0,
  1021. "spike": 0,
  1022. "internal_demand_matched": 0,
  1023. "senior_scored": 0,
  1024. "senior_fit_candidates": 0,
  1025. "senior_fit_passed": 0,
  1026. "stats_saved": 0,
  1027. "final_retained": 0,
  1028. "odps_synced": 0,
  1029. "odps_written": 0,
  1030. "demand_cache_run_id": demand_cache_run_id,
  1031. "demand_cache_error": demand_cache_error,
  1032. "demand_name_count": len(demand_name_set),
  1033. "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
  1034. "demand_match_batches": 0,
  1035. "senior_fit_batches": 0,
  1036. "records_initialized": 0,
  1037. "awaiting_yesterday_score": 0,
  1038. "heat_resumed": 0,
  1039. "demand_match_resumed": 0,
  1040. "senior_fit_resumed": 0,
  1041. "stats_resumed": 0,
  1042. "finalized_resumed": 0,
  1043. "dry_run": dry_run,
  1044. "skip_odps": skip_odps,
  1045. "skip_db_save": skip_db_save,
  1046. }
  1047. retained_words: list[dict[str, Any]] = []
  1048. export_rows: list[dict[str, Any]] = []
  1049. final_hive_rows: list[dict[str, Any]] = []
  1050. senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
  1051. strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
  1052. pending_items: list[dict[str, Any]] = []
  1053. candidate_items: list[dict[str, Any]] = []
  1054. for meta in meta_rows:
  1055. name = str(meta.get("name") or "").strip()
  1056. fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
  1057. fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
  1058. if not name or not fetch_start_ymd or not fetch_end_ymd:
  1059. summary["skipped"] += 1
  1060. continue
  1061. candidate_items.append(
  1062. {
  1063. "meta": meta,
  1064. "name": name,
  1065. "fetch_start_ymd": fetch_start_ymd,
  1066. "fetch_end_ymd": fetch_end_ymd,
  1067. }
  1068. )
  1069. _init_candidate_wxindex_word_records(
  1070. repository,
  1071. candidate_items,
  1072. analyze_ymd=analyze_ymd,
  1073. demand_cache_run_id=demand_cache_run_id,
  1074. dry_run=dry_run,
  1075. skip_db_save=skip_db_save,
  1076. verbose=verbose,
  1077. )
  1078. summary["records_initialized"] = len(candidate_items)
  1079. existing_records: dict[str, dict[str, Any]] = {}
  1080. existing_stats_names: set[str] = set()
  1081. if not dry_run and not skip_db_save:
  1082. candidate_names = [item["name"] for item in candidate_items]
  1083. existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
  1084. analyze_ymd=analyze_ymd,
  1085. names=candidate_names,
  1086. )
  1087. existing_stats_names = repository.list_wxindex_word_stats_names(
  1088. analyze_ymd=analyze_ymd,
  1089. names=candidate_names,
  1090. )
  1091. yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
  1092. candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
  1093. repository,
  1094. candidate_items,
  1095. yesterday_ymd=yesterday_ymd,
  1096. existing_records=existing_records,
  1097. verbose=verbose,
  1098. )
  1099. summary["awaiting_yesterday_score"] = awaiting_yesterday
  1100. for item in candidate_items:
  1101. if not item.get("record_id"):
  1102. existing = existing_records.get(item["name"])
  1103. if existing:
  1104. item["record_id"] = int(existing.get("id") or 0)
  1105. for item in candidate_items:
  1106. meta = item["meta"]
  1107. name = item["name"]
  1108. fetch_start_ymd = item["fetch_start_ymd"]
  1109. fetch_end_ymd = item["fetch_end_ymd"]
  1110. existing_record = existing_records.get(name)
  1111. scores = repository.list_wxindex_word_scores_in_range(
  1112. name,
  1113. start_ymd=fetch_start_ymd,
  1114. end_ymd=fetch_end_ymd,
  1115. )
  1116. if (
  1117. _is_heat_analysis_done(existing_record)
  1118. and not _should_rerun_heat_analysis(
  1119. existing_record,
  1120. scores=scores,
  1121. fetch_start_ymd=fetch_start_ymd,
  1122. fetch_end_ymd=fetch_end_ymd,
  1123. min_days=min_days,
  1124. max_days=max_days,
  1125. )
  1126. ):
  1127. summary["heat_resumed"] += 1
  1128. if _is_analysis_skipped_record(existing_record):
  1129. if verbose:
  1130. detail = _parse_record_detail_json(existing_record)
  1131. print(
  1132. f"resume skip word={name} reason={detail.get('skip_reason')} "
  1133. f"days={existing_record.get('data_days')}"
  1134. )
  1135. export_rows.append(
  1136. _build_skipped_export_row_from_record(
  1137. record=existing_record,
  1138. meta=meta,
  1139. name=name,
  1140. analyze_ymd=analyze_ymd,
  1141. fetch_start_ymd=fetch_start_ymd,
  1142. fetch_end_ymd=fetch_end_ymd,
  1143. )
  1144. )
  1145. continue
  1146. pending_item = _rehydrate_pending_item_from_record(
  1147. item,
  1148. existing_record,
  1149. senior_threshold=senior_threshold,
  1150. )
  1151. pending_items.append(pending_item)
  1152. if verbose:
  1153. print(
  1154. f"resume heat analyzed word={name} "
  1155. f"retain_reason={pending_item.get('retain_reason') or ''} "
  1156. f"data_days={pending_item['result'].get('data_days')}"
  1157. )
  1158. continue
  1159. result = analyze_wxindex_heat_patterns(
  1160. scores,
  1161. start_ymd=fetch_start_ymd,
  1162. end_ymd=fetch_end_ymd,
  1163. sustained_threshold=sustained_threshold,
  1164. min_days=min_days,
  1165. max_days=max_days,
  1166. spike_days=spike_days,
  1167. spike_ratio=spike_ratio,
  1168. spike_baseline_floor=spike_baseline_floor,
  1169. rising_overall_change_rate=rising_overall_change_rate,
  1170. rising_window_change_rate=rising_window_change_rate,
  1171. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1172. )
  1173. if result.get("skipped"):
  1174. if verbose:
  1175. print(
  1176. f"skip word={name} reason={result.get('skip_reason')} "
  1177. f"days={result.get('data_days')}"
  1178. )
  1179. skipped_payload = build_wxindex_word_record_skipped_payload(
  1180. meta=meta,
  1181. name=name,
  1182. analyze_ymd=analyze_ymd,
  1183. fetch_start_ymd=fetch_start_ymd,
  1184. fetch_end_ymd=fetch_end_ymd,
  1185. result=result,
  1186. demand_cache_run_id=demand_cache_run_id,
  1187. )
  1188. _persist_wxindex_word_record(
  1189. repository,
  1190. skipped_payload,
  1191. dry_run=dry_run,
  1192. skip_db_save=skip_db_save,
  1193. verbose=verbose,
  1194. action="update skipped",
  1195. )
  1196. export_rows.append(
  1197. {
  1198. "analyze_ymd": analyze_ymd,
  1199. "name": name,
  1200. "meta_id": meta.get("id"),
  1201. "fetch_start_ymd": fetch_start_ymd,
  1202. "fetch_end_ymd": fetch_end_ymd,
  1203. "analysis_skipped": True,
  1204. "skip_reason": result.get("skip_reason"),
  1205. "data_days": result.get("data_days"),
  1206. "retain_reason": "",
  1207. "is_sustained_high": False,
  1208. "is_rising": False,
  1209. "is_spike": False,
  1210. "is_internal_demand_matched": "",
  1211. "matched_demand": "",
  1212. "senior_fit_score": "",
  1213. "is_final_retained": False,
  1214. "min_score": "",
  1215. "max_score": "",
  1216. "avg_score": "",
  1217. }
  1218. )
  1219. continue
  1220. retain_reason = str(result.get("retain_reason") or "").strip() or None
  1221. pending_item = {
  1222. "meta": meta,
  1223. "name": name,
  1224. "fetch_start_ymd": fetch_start_ymd,
  1225. "fetch_end_ymd": fetch_end_ymd,
  1226. "result": result,
  1227. "retain_reason": retain_reason,
  1228. "record_id": item.get("record_id", 0),
  1229. "is_internal_demand_matched": None,
  1230. "matched_demand": None,
  1231. "internal_demand_match_json": None,
  1232. "senior_fit_score": None,
  1233. "demand_senior_fit_json": None,
  1234. "senior_fit_passed": None,
  1235. "is_final_retained": None,
  1236. }
  1237. pending_items.append(pending_item)
  1238. _persist_pending_item_record(
  1239. repository,
  1240. pending_item,
  1241. analyze_ymd=analyze_ymd,
  1242. demand_cache_run_id=demand_cache_run_id,
  1243. phase="heat_analyzed",
  1244. senior_threshold=senior_threshold,
  1245. sustained_threshold=sustained_threshold,
  1246. spike_days=spike_days,
  1247. spike_ratio=spike_ratio,
  1248. spike_baseline_floor=spike_baseline_floor,
  1249. rising_overall_change_rate=rising_overall_change_rate,
  1250. rising_window_change_rate=rising_window_change_rate,
  1251. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1252. dry_run=dry_run,
  1253. skip_db_save=skip_db_save,
  1254. verbose=verbose,
  1255. )
  1256. if verbose:
  1257. print(
  1258. f"heat analyzed word={name} retain_reason={retain_reason or ''} "
  1259. f"data_days={result.get('data_days')}"
  1260. )
  1261. demand_match_results: dict[str, dict[str, Any]] = {}
  1262. pending_by_name = {item["name"]: item for item in pending_items}
  1263. sustained_high_words = [
  1264. item["name"]
  1265. for item in pending_items
  1266. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  1267. and item.get("is_internal_demand_matched") is None
  1268. ]
  1269. for item in pending_items:
  1270. if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
  1271. continue
  1272. if item.get("is_internal_demand_matched") is not None:
  1273. summary["demand_match_resumed"] += 1
  1274. def _persist_demand_match_updates(words: list[str]) -> None:
  1275. for word in words:
  1276. pending_item = pending_by_name.get(word)
  1277. if pending_item is None:
  1278. continue
  1279. match_result = demand_match_results.get(word) or {
  1280. "word": word,
  1281. "matched": False,
  1282. "matched_demand": "",
  1283. "match_list": [],
  1284. }
  1285. _apply_demand_match_to_item(pending_item, match_result)
  1286. _persist_pending_item_record(
  1287. repository,
  1288. pending_item,
  1289. analyze_ymd=analyze_ymd,
  1290. demand_cache_run_id=demand_cache_run_id,
  1291. phase="demand_matched",
  1292. senior_threshold=senior_threshold,
  1293. sustained_threshold=sustained_threshold,
  1294. spike_days=spike_days,
  1295. spike_ratio=spike_ratio,
  1296. spike_baseline_floor=spike_baseline_floor,
  1297. rising_overall_change_rate=rising_overall_change_rate,
  1298. rising_window_change_rate=rising_window_change_rate,
  1299. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1300. dry_run=dry_run,
  1301. skip_db_save=skip_db_save,
  1302. verbose=verbose,
  1303. )
  1304. if sustained_high_words:
  1305. if postprocess_service is not None and demand_name_set:
  1306. for batch in _chunk_words(sustained_high_words):
  1307. summary["demand_match_batches"] += 1
  1308. if verbose:
  1309. print(f"demand match batch size={len(batch)} words={batch}")
  1310. demand_match_results.update(
  1311. postprocess_service.match_words_to_demand_pool(
  1312. words=batch,
  1313. demand_name_set=demand_name_set,
  1314. )
  1315. )
  1316. _persist_demand_match_updates(batch)
  1317. else:
  1318. for word in sustained_high_words:
  1319. demand_match_results[word] = {
  1320. "word": word,
  1321. "matched": False,
  1322. "matched_demand": "",
  1323. "match_list": [],
  1324. "skip_reason": "empty_demand_cache",
  1325. }
  1326. _persist_demand_match_updates(sustained_high_words)
  1327. senior_fit_candidate_names: list[str] = []
  1328. for item in pending_items:
  1329. retain_reason = item.get("retain_reason")
  1330. name = str(item.get("name") or "").strip()
  1331. if not retain_reason or not name:
  1332. continue
  1333. if _is_senior_fit_attempt_done(item, existing_records.get(name)):
  1334. summary["senior_fit_resumed"] += 1
  1335. continue
  1336. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  1337. if not item.get("is_internal_demand_matched"):
  1338. continue
  1339. senior_fit_candidate_names.append(name)
  1340. senior_fit_results: dict[str, dict[str, Any]] = {}
  1341. def _persist_senior_fit_updates(words: list[str]) -> None:
  1342. for word in words:
  1343. pending_item = pending_by_name.get(word)
  1344. senior_result = senior_fit_results.get(word)
  1345. if pending_item is None or senior_result is None:
  1346. continue
  1347. _apply_senior_fit_to_item(pending_item, senior_result)
  1348. _persist_pending_item_record(
  1349. repository,
  1350. pending_item,
  1351. analyze_ymd=analyze_ymd,
  1352. demand_cache_run_id=demand_cache_run_id,
  1353. phase="senior_fit_scored",
  1354. senior_threshold=senior_threshold,
  1355. sustained_threshold=sustained_threshold,
  1356. spike_days=spike_days,
  1357. spike_ratio=spike_ratio,
  1358. spike_baseline_floor=spike_baseline_floor,
  1359. rising_overall_change_rate=rising_overall_change_rate,
  1360. rising_window_change_rate=rising_window_change_rate,
  1361. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1362. dry_run=dry_run,
  1363. skip_db_save=skip_db_save,
  1364. verbose=verbose,
  1365. )
  1366. if verbose and not pending_item.get("senior_fit_passed"):
  1367. print(
  1368. f"senior fit rejected word={word} "
  1369. f"score={pending_item.get('senior_fit_score')}"
  1370. )
  1371. if config is not None and senior_fit_candidate_names:
  1372. for batch in _chunk_words(senior_fit_candidate_names):
  1373. summary["senior_fit_batches"] += 1
  1374. if verbose:
  1375. print(f"senior fit batch size={len(batch)} words={batch}")
  1376. senior_fit_results.update(
  1377. score_wxindex_words_senior_fit(
  1378. words=batch,
  1379. config=config,
  1380. senior_threshold=senior_threshold,
  1381. batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
  1382. )
  1383. )
  1384. _persist_senior_fit_updates(batch)
  1385. stats_saved, stats_resumed = _save_senior_fit_passed_stats(
  1386. repository,
  1387. pending_items,
  1388. analyze_ymd=analyze_ymd,
  1389. existing_stats_names=existing_stats_names,
  1390. dry_run=dry_run,
  1391. skip_db_save=skip_db_save,
  1392. verbose=verbose,
  1393. )
  1394. summary["stats_saved"] = stats_saved
  1395. summary["stats_resumed"] = stats_resumed
  1396. for item in pending_items:
  1397. name = item["name"]
  1398. result = item["result"]
  1399. retain_reason = item.get("retain_reason")
  1400. if retain_reason:
  1401. retained_words.append(
  1402. {
  1403. "name": name,
  1404. "retain_reason": retain_reason,
  1405. "data_days": result.get("data_days"),
  1406. "data_start_ymd": result.get("data_start_ymd"),
  1407. "data_end_ymd": result.get("data_end_ymd"),
  1408. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  1409. "matched_demand": str(item.get("matched_demand") or ""),
  1410. "senior_fit_score": item.get("senior_fit_score"),
  1411. "is_final_retained": bool(item.get("is_final_retained")),
  1412. }
  1413. )
  1414. existing_record = existing_records.get(name)
  1415. if _is_finalized_record(existing_record):
  1416. summary["finalized_resumed"] += 1
  1417. record_id = int(item.get("record_id") or existing_record.get("id") or 0)
  1418. if verbose:
  1419. print(f"resume finalized word={name}")
  1420. else:
  1421. record_id = _persist_pending_item_record(
  1422. repository,
  1423. item,
  1424. analyze_ymd=analyze_ymd,
  1425. demand_cache_run_id=demand_cache_run_id,
  1426. phase="finalized",
  1427. senior_threshold=senior_threshold,
  1428. sustained_threshold=sustained_threshold,
  1429. spike_days=spike_days,
  1430. spike_ratio=spike_ratio,
  1431. spike_baseline_floor=spike_baseline_floor,
  1432. rising_overall_change_rate=rising_overall_change_rate,
  1433. rising_window_change_rate=rising_window_change_rate,
  1434. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1435. dry_run=dry_run,
  1436. skip_db_save=skip_db_save,
  1437. verbose=verbose,
  1438. )
  1439. export_rows.append(
  1440. _build_export_row_from_item(
  1441. item,
  1442. analyze_ymd=analyze_ymd,
  1443. strategy=strategy,
  1444. )
  1445. )
  1446. if item.get("is_final_retained") and strategy:
  1447. final_hive_rows.append(
  1448. build_wxindex_word_hive_row(
  1449. wxindex_word_record_id=record_id,
  1450. word=name,
  1451. strategy=strategy,
  1452. partition_dt=analyze_ymd,
  1453. max_score=result.get("max_score"),
  1454. retain_reason=item.get("retain_reason"),
  1455. )
  1456. )
  1457. if (
  1458. final_hive_rows
  1459. and strategy
  1460. and config is not None
  1461. and not dry_run
  1462. and not skip_odps
  1463. ):
  1464. odps_summary = sync_wxindex_word_rows_to_odps(
  1465. config,
  1466. repository,
  1467. hive_rows=final_hive_rows,
  1468. partition_dt=analyze_ymd,
  1469. strategy=strategy,
  1470. )
  1471. summary["odps_written"] = odps_summary.get("written_count", 0)
  1472. summary["odps_synced"] = odps_summary.get("odps_synced", 0)
  1473. summary["odps_sync"] = odps_summary
  1474. elif dry_run or skip_odps:
  1475. summary["odps_written"] = len(final_hive_rows)
  1476. summary["odps_synced"] = len(final_hive_rows)
  1477. _refresh_wxindex_heat_job_summary(
  1478. summary,
  1479. pending_items=pending_items,
  1480. export_rows=export_rows,
  1481. )
  1482. summary["retained_words"] = retained_words
  1483. summary["export_rows"] = export_rows
  1484. return summary
  1485. WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
  1486. "analyze_ymd",
  1487. "name",
  1488. "meta_id",
  1489. "fetch_start_ymd",
  1490. "fetch_end_ymd",
  1491. "data_start_ymd",
  1492. "data_end_ymd",
  1493. "data_days",
  1494. "analysis_skipped",
  1495. "skip_reason",
  1496. "is_sustained_high",
  1497. "is_rising",
  1498. "is_spike",
  1499. "retain_reason",
  1500. "is_internal_demand_matched",
  1501. "matched_demand",
  1502. "senior_fit_score",
  1503. "is_final_retained",
  1504. "min_score",
  1505. "max_score",
  1506. "avg_score",
  1507. "demand_id",
  1508. "weight",
  1509. ]
  1510. def write_wxindex_heat_pattern_csv(
  1511. rows: list[dict[str, Any]],
  1512. output_path: str | Path,
  1513. *,
  1514. fieldnames: list[str] | None = None,
  1515. ) -> Path:
  1516. """将热度分析明细写入本地 CSV。"""
  1517. path = Path(output_path).expanduser()
  1518. if not path.is_absolute():
  1519. path = Path.cwd() / path
  1520. path.parent.mkdir(parents=True, exist_ok=True)
  1521. columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
  1522. with path.open("w", encoding="utf-8-sig", newline="") as handle:
  1523. writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
  1524. writer.writeheader()
  1525. for row in rows:
  1526. writer.writerow({column: row.get(column, "") for column in columns})
  1527. return path