wxindex_heat_pattern.py 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667
  1. """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
  2. from __future__ import annotations
  3. import csv
  4. import json
  5. from datetime import date, datetime, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. from app.hot_content.client import JsonApiClient
  9. from app.hot_content.demand_cache_service import DemandCacheService
  10. from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
  11. from app.hot_content.demand_quality import (
  12. TYPE_PHRASE,
  13. llm_score_senior_fit,
  14. lookup_quality_scores,
  15. )
  16. from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
  17. from app.hot_content.exceptions import HotContentFlowError
  18. from app.hot_content.postprocess_service import ContributionPostprocessService
  19. from app.hot_content.repository import HotContentRepository
  20. from app.hot_content.timezone import SHANGHAI_TZ
  21. from app.hot_content.types import FlowConfig
  22. from app.hot_content.wxindex_trend import (
  23. HEAT_RISING_ADJACENT_UP_RATIO,
  24. HEAT_RISING_OVERALL_CHANGE_RATE,
  25. HEAT_RISING_RECENT_DROP_RATE,
  26. HEAT_RISING_WINDOW_CHANGE_RATE,
  27. HEAT_SPIKE_BASELINE_FLOOR,
  28. HEAT_SPIKE_RATIO,
  29. extract_sorted_scores,
  30. is_wxindex_heat_rising_scores,
  31. is_wxindex_spike_scores,
  32. )
  33. from app.hot_content.wxindex_words import filter_scores_in_ymd_window
  34. WXINDEX_HEAT_MIN_DAYS = 7
  35. WXINDEX_HEAT_MAX_DAYS = 14
  36. WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
  37. WXINDEX_SPIKE_LOOKBACK_DAYS = 3
  38. WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
  39. WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
  40. WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
  41. WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
  42. WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
  43. WXINDEX_RISING_RECENT_DROP_RATE = HEAT_RISING_RECENT_DROP_RATE
  44. WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
  45. WXINDEX_WORD_LLM_BATCH_SIZE = 10
  46. PATTERN_SUSTAINED_HIGH = "sustained_high"
  47. PATTERN_RISING = "rising"
  48. PATTERN_SPIKE = "spike"
  49. RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
  50. RETAIN_REASON_RISING = "热度持续上涨"
  51. RETAIN_REASON_SPIKE = "热度突然暴涨"
  52. PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
  53. PHASE_HEAT_ANALYZED = "heat_analyzed"
  54. PHASE_DEMAND_MATCHED = "demand_matched"
  55. PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
  56. PHASE_FINALIZED = "finalized"
  57. _HEAT_DONE_PHASES = frozenset(
  58. {
  59. PHASE_ANALYSIS_SKIPPED,
  60. PHASE_HEAT_ANALYZED,
  61. PHASE_DEMAND_MATCHED,
  62. PHASE_SENIOR_FIT_SCORED,
  63. PHASE_FINALIZED,
  64. }
  65. )
  66. _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
  67. "data_start_ymd": None,
  68. "data_end_ymd": None,
  69. "data_days": None,
  70. "is_sustained_high": None,
  71. "is_rising": None,
  72. "is_spike": None,
  73. "retain_reason": None,
  74. "is_internal_demand_matched": None,
  75. "matched_demand": None,
  76. "internal_demand_match_json": None,
  77. "senior_fit_score": None,
  78. "demand_senior_fit_json": None,
  79. "is_final_retained": None,
  80. "min_score": None,
  81. "max_score": None,
  82. "avg_score": None,
  83. "detail_json": None,
  84. }
  85. def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
  86. detail = record.get("detail_json")
  87. if isinstance(detail, dict):
  88. return detail
  89. if isinstance(detail, str) and detail.strip():
  90. try:
  91. parsed = json.loads(detail)
  92. return parsed if isinstance(parsed, dict) else {}
  93. except json.JSONDecodeError:
  94. return {}
  95. return {}
  96. def _record_phase(record: dict[str, Any]) -> str:
  97. return str(_parse_record_detail_json(record).get("phase") or "").strip()
  98. def _nullable_bool_from_record(value: Any) -> bool | None:
  99. if value is None:
  100. return None
  101. return bool(value)
  102. def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
  103. value = record.get(field)
  104. if value is None:
  105. return None
  106. if isinstance(value, (dict, list)):
  107. return value
  108. if isinstance(value, (bytes, bytearray)):
  109. value = value.decode("utf-8")
  110. if isinstance(value, str):
  111. try:
  112. return json.loads(value)
  113. except json.JSONDecodeError:
  114. return None
  115. return value
  116. def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
  117. if not record:
  118. return False
  119. return _record_phase(record) in _HEAT_DONE_PHASES
  120. def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
  121. return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
  122. def _is_senior_fit_needed(
  123. *,
  124. retain_reason: str | None,
  125. is_internal_demand_matched: bool | None,
  126. ) -> bool:
  127. if not retain_reason:
  128. return False
  129. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  130. return bool(is_internal_demand_matched)
  131. return True
  132. def _is_finalized_record(record: dict[str, Any] | None) -> bool:
  133. if not record:
  134. return False
  135. return _record_phase(record) == PHASE_FINALIZED
  136. def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
  137. ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
  138. return max(ymds) if ymds else None
  139. def _should_rerun_heat_analysis(
  140. existing_record: dict[str, Any] | None,
  141. *,
  142. scores: list[dict[str, Any]],
  143. fetch_start_ymd: str,
  144. fetch_end_ymd: str,
  145. min_days: int,
  146. max_days: int,
  147. ) -> bool:
  148. if not _is_heat_analysis_done(existing_record):
  149. return True
  150. assert existing_record is not None
  151. latest_ymd = _latest_score_ymd(scores)
  152. record_end = str(existing_record.get("data_end_ymd") or "").strip()
  153. if latest_ymd and record_end and latest_ymd > record_end:
  154. return True
  155. if _is_analysis_skipped_record(existing_record):
  156. _, skip_reason = prepare_analysis_scores(
  157. scores,
  158. start_ymd=fetch_start_ymd,
  159. end_ymd=fetch_end_ymd,
  160. min_days=min_days,
  161. max_days=max_days,
  162. )
  163. if skip_reason is None:
  164. return True
  165. return False
  166. def _is_senior_fit_attempt_done(
  167. item: dict[str, Any],
  168. existing_record: dict[str, Any] | None,
  169. ) -> bool:
  170. if existing_record:
  171. phase = _record_phase(existing_record)
  172. if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
  173. return True
  174. return item.get("senior_fit_score") is not None
  175. def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
  176. if score is None:
  177. return False
  178. try:
  179. return float(score) / 10.0 > senior_threshold
  180. except (TypeError, ValueError):
  181. return False
  182. def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
  183. detail = _parse_record_detail_json(record)
  184. retain_reason_raw = record.get("retain_reason")
  185. retain_reason = (
  186. str(retain_reason_raw).strip() if retain_reason_raw is not None else None
  187. ) or None
  188. def _bool_field(key: str) -> bool | None:
  189. value = record.get(key)
  190. if value is None:
  191. return None
  192. return bool(value)
  193. return {
  194. "skipped": False,
  195. "skip_reason": None,
  196. "data_days": record.get("data_days"),
  197. "data_start_ymd": record.get("data_start_ymd"),
  198. "data_end_ymd": record.get("data_end_ymd"),
  199. "fetch_start_ymd": record.get("fetch_start_ymd"),
  200. "fetch_end_ymd": record.get("fetch_end_ymd"),
  201. "min_score": record.get("min_score"),
  202. "max_score": record.get("max_score"),
  203. "avg_score": record.get("avg_score"),
  204. "is_sustained_high": _bool_field("is_sustained_high"),
  205. "is_rising": _bool_field("is_rising"),
  206. "is_spike": _bool_field("is_spike"),
  207. "retain_reason": retain_reason,
  208. "patterns": list(detail.get("patterns") or []),
  209. }
  210. def _rehydrate_pending_item_from_record(
  211. item: dict[str, Any],
  212. record: dict[str, Any],
  213. *,
  214. senior_threshold: float,
  215. ) -> dict[str, Any]:
  216. result = _rehydrate_result_from_record(record)
  217. retain_reason = result.get("retain_reason")
  218. is_internal_demand_matched = _nullable_bool_from_record(
  219. record.get("is_internal_demand_matched")
  220. )
  221. matched_demand_raw = record.get("matched_demand")
  222. matched_demand = (
  223. str(matched_demand_raw).strip() if matched_demand_raw is not None else None
  224. ) or None
  225. senior_fit_score = record.get("senior_fit_score")
  226. senior_fit_passed: bool | None = None
  227. is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
  228. if senior_fit_score is not None:
  229. senior_fit_passed = _senior_fit_passed_from_score(
  230. senior_fit_score,
  231. senior_threshold=senior_threshold,
  232. )
  233. elif not _is_senior_fit_needed(
  234. retain_reason=retain_reason,
  235. is_internal_demand_matched=is_internal_demand_matched,
  236. ):
  237. senior_fit_passed = False
  238. if is_final_retained is None:
  239. is_final_retained = False
  240. return {
  241. "meta": item["meta"],
  242. "name": item["name"],
  243. "fetch_start_ymd": item["fetch_start_ymd"],
  244. "fetch_end_ymd": item["fetch_end_ymd"],
  245. "result": result,
  246. "retain_reason": retain_reason,
  247. "record_id": int(record.get("id") or item.get("record_id") or 0),
  248. "is_internal_demand_matched": is_internal_demand_matched,
  249. "matched_demand": matched_demand,
  250. "internal_demand_match_json": _parse_record_json_field(
  251. record,
  252. "internal_demand_match_json",
  253. ),
  254. "senior_fit_score": senior_fit_score,
  255. "demand_senior_fit_json": _parse_record_json_field(
  256. record,
  257. "demand_senior_fit_json",
  258. ),
  259. "senior_fit_passed": senior_fit_passed,
  260. "is_final_retained": is_final_retained,
  261. }
  262. def _build_skipped_export_row_from_record(
  263. *,
  264. record: dict[str, Any],
  265. meta: dict[str, Any],
  266. name: str,
  267. analyze_ymd: str,
  268. fetch_start_ymd: str,
  269. fetch_end_ymd: str,
  270. ) -> dict[str, Any]:
  271. detail = _parse_record_detail_json(record)
  272. return {
  273. "analyze_ymd": analyze_ymd,
  274. "name": name,
  275. "meta_id": meta.get("id"),
  276. "fetch_start_ymd": fetch_start_ymd,
  277. "fetch_end_ymd": fetch_end_ymd,
  278. "analysis_skipped": True,
  279. "skip_reason": detail.get("skip_reason") or "",
  280. "data_days": record.get("data_days") or "",
  281. "retain_reason": "",
  282. "is_sustained_high": False,
  283. "is_rising": False,
  284. "is_spike": False,
  285. "is_internal_demand_matched": "",
  286. "matched_demand": "",
  287. "senior_fit_score": "",
  288. "is_final_retained": False,
  289. "min_score": "",
  290. "max_score": "",
  291. "avg_score": "",
  292. }
  293. def _refresh_wxindex_heat_job_summary(
  294. summary: dict[str, Any],
  295. *,
  296. pending_items: list[dict[str, Any]],
  297. export_rows: list[dict[str, Any]],
  298. ) -> None:
  299. summary["analyzed"] = len(pending_items)
  300. summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
  301. summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
  302. summary["sustained_high"] = sum(
  303. 1
  304. for item in pending_items
  305. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  306. )
  307. summary["rising"] = sum(
  308. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
  309. )
  310. summary["spike"] = sum(
  311. 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
  312. )
  313. summary["internal_demand_matched"] = sum(
  314. 1 for item in pending_items if item.get("is_internal_demand_matched")
  315. )
  316. summary["senior_fit_candidates"] = sum(
  317. 1
  318. for item in pending_items
  319. if _is_senior_fit_needed(
  320. retain_reason=item.get("retain_reason"),
  321. is_internal_demand_matched=item.get("is_internal_demand_matched"),
  322. )
  323. )
  324. summary["senior_scored"] = sum(
  325. 1 for item in pending_items if item.get("senior_fit_score") is not None
  326. )
  327. summary["senior_fit_passed"] = sum(
  328. 1 for item in pending_items if item.get("senior_fit_passed")
  329. )
  330. summary["final_retained"] = sum(
  331. 1 for item in pending_items if item.get("is_final_retained")
  332. )
  333. def resolve_retain_reason(
  334. *,
  335. is_sustained_high: bool,
  336. is_rising: bool,
  337. is_spike: bool,
  338. ) -> str | None:
  339. """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
  340. if is_rising:
  341. return RETAIN_REASON_RISING
  342. if is_spike:
  343. return RETAIN_REASON_SPIKE
  344. if is_sustained_high:
  345. return RETAIN_REASON_SUSTAINED_HIGH
  346. return None
  347. def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
  348. size = max(batch_size, 1)
  349. return [words[index : index + size] for index in range(0, len(words), size)]
  350. def _empty_senior_fit_result() -> dict[str, Any]:
  351. return {
  352. "senior_fit_score": None,
  353. "demand_senior_fit_json": {"source": "", "items": []},
  354. "passed": False,
  355. }
  356. def score_wxindex_words_senior_fit(
  357. *,
  358. words: list[str],
  359. config: FlowConfig,
  360. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  361. batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
  362. ) -> dict[str, dict[str, Any]]:
  363. """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
  364. cleaned: list[str] = []
  365. seen: set[str] = set()
  366. for raw in words:
  367. word = str(raw or "").strip()
  368. if not word or word in seen:
  369. continue
  370. seen.add(word)
  371. cleaned.append(word)
  372. results: dict[str, dict[str, Any]] = {
  373. word: _empty_senior_fit_result() for word in cleaned
  374. }
  375. for batch in _chunk_words(cleaned, batch_size=batch_size):
  376. candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
  377. senior_fit_json = llm_score_senior_fit(
  378. channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
  379. candidates=candidates,
  380. model=config.demand_quality_llm_model,
  381. max_attempts=config.demand_quality_llm_max_attempts,
  382. retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
  383. max_tokens=config.demand_quality_llm_max_tokens,
  384. )
  385. senior_fit_json["threshold"] = senior_threshold * 10.0
  386. for word in batch:
  387. _, senior_score = lookup_quality_scores(
  388. demand_type=TYPE_PHRASE,
  389. demand_text=word,
  390. event_sense_json=None,
  391. senior_fit_json=senior_fit_json,
  392. )
  393. passed = (
  394. senior_score is not None
  395. and senior_score / 10.0 > senior_threshold
  396. )
  397. results[word] = {
  398. "senior_fit_score": senior_score,
  399. "demand_senior_fit_json": senior_fit_json,
  400. "passed": passed,
  401. }
  402. return results
  403. def score_wxindex_word_senior_fit(
  404. *,
  405. word: str,
  406. config: FlowConfig,
  407. senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
  408. ) -> dict[str, Any]:
  409. """对单个微信指数热词执行老年性 LLM 评分。"""
  410. target_word = str(word or "").strip()
  411. if not target_word:
  412. return _empty_senior_fit_result()
  413. batch_results = score_wxindex_words_senior_fit(
  414. words=[target_word],
  415. config=config,
  416. senior_threshold=senior_threshold,
  417. batch_size=1,
  418. )
  419. return batch_results.get(target_word, _empty_senior_fit_result())
  420. def build_wxindex_odps_extend(retain_reason: str | None) -> str:
  421. method = str(retain_reason or "").strip()
  422. if not method:
  423. return "{}"
  424. return json.dumps({"method": method}, ensure_ascii=False)
  425. def build_wxindex_word_hive_row(
  426. *,
  427. wxindex_word_record_id: int,
  428. word: str,
  429. strategy: str,
  430. partition_dt: str,
  431. max_score: float | None,
  432. retain_reason: str | None = None,
  433. ) -> dict[str, Any]:
  434. normalized_name = str(word or "").strip()
  435. weight = 0.0
  436. if max_score is not None:
  437. weight = float(max_score) / WEIGHT_DIVISOR
  438. return {
  439. "record_id": wxindex_word_record_id,
  440. "strategy": strategy,
  441. "demand_id": build_demand_id(
  442. strategy=strategy,
  443. demand_name=normalized_name,
  444. partition_dt=partition_dt,
  445. ),
  446. "demand_name": normalized_name,
  447. "weight": weight,
  448. "type": TYPE_PHRASE,
  449. "video_count": None,
  450. "video_list": [],
  451. "extend": build_wxindex_odps_extend(retain_reason),
  452. "dt": partition_dt,
  453. }
  454. def build_wxindex_word_odps_sync_row(
  455. *,
  456. wxindex_word_record_id: int,
  457. word: str,
  458. strategy: str,
  459. partition_dt: str,
  460. max_score: float | None,
  461. retain_reason: str | None = None,
  462. ) -> dict[str, Any]:
  463. normalized_name = str(word or "").strip()
  464. weight = None
  465. if max_score is not None:
  466. weight = float(max_score) / WEIGHT_DIVISOR
  467. return {
  468. "partition_dt": partition_dt,
  469. "strategy": strategy,
  470. "demand_id": build_demand_id(
  471. strategy=strategy,
  472. demand_name=normalized_name,
  473. partition_dt=partition_dt,
  474. ),
  475. "demand_name": normalized_name,
  476. "demand_type": TYPE_PHRASE,
  477. "record_id": wxindex_word_record_id,
  478. "weight": weight,
  479. "extend": build_wxindex_odps_extend(retain_reason),
  480. }
  481. def prepare_analysis_scores(
  482. scores: list[dict[str, Any]],
  483. *,
  484. start_ymd: str,
  485. end_ymd: str,
  486. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  487. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  488. ) -> tuple[list[dict[str, Any]], str | None]:
  489. """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
  490. window_scores = filter_scores_in_ymd_window(
  491. scores,
  492. start_ymd=start_ymd,
  493. end_ymd=end_ymd,
  494. )
  495. if len(window_scores) > max_days:
  496. window_scores = window_scores[-max_days:]
  497. if len(window_scores) < min_days:
  498. return window_scores, "insufficient_days"
  499. return window_scores, None
  500. def _score_row_ymd(row: dict[str, Any]) -> str:
  501. return str(row.get("ymd") or row.get("dt") or "").strip()
  502. def _window_ymd_bounds(
  503. window_scores: list[dict[str, Any]],
  504. ) -> tuple[str | None, str | None]:
  505. if not window_scores:
  506. return None, None
  507. start_ymd = _score_row_ymd(window_scores[0]) or None
  508. end_ymd = _score_row_ymd(window_scores[-1]) or None
  509. return start_ymd, end_ymd
  510. def analyze_wxindex_heat_patterns(
  511. scores: list[dict[str, Any]],
  512. *,
  513. start_ymd: str,
  514. end_ymd: str,
  515. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  516. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  517. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  518. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  519. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  520. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  521. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  522. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  523. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  524. ) -> dict[str, Any]:
  525. """对目标区间数据判断三种热度模式。"""
  526. window_scores, skip_reason = prepare_analysis_scores(
  527. scores,
  528. start_ymd=start_ymd,
  529. end_ymd=end_ymd,
  530. min_days=min_days,
  531. max_days=max_days,
  532. )
  533. if skip_reason:
  534. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  535. return {
  536. "skipped": True,
  537. "skip_reason": skip_reason,
  538. "data_days": len(window_scores) if window_scores else None,
  539. "data_start_ymd": data_start_ymd,
  540. "data_end_ymd": data_end_ymd,
  541. "patterns": [],
  542. }
  543. numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
  544. data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
  545. is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
  546. is_rising = is_wxindex_heat_rising_scores(
  547. numeric_scores,
  548. min_points=min_days,
  549. overall_change_rate=rising_overall_change_rate,
  550. window_change_rate_threshold=rising_window_change_rate,
  551. adjacent_up_ratio=rising_adjacent_up_ratio,
  552. )
  553. is_spike = is_wxindex_spike_scores(
  554. numeric_scores,
  555. spike_days=spike_days,
  556. min_points=min_days,
  557. spike_ratio=spike_ratio,
  558. baseline_floor=spike_baseline_floor,
  559. )
  560. retain_reason = resolve_retain_reason(
  561. is_sustained_high=is_sustained_high,
  562. is_rising=is_rising,
  563. is_spike=is_spike,
  564. )
  565. patterns: list[str] = []
  566. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  567. patterns.append(PATTERN_SUSTAINED_HIGH)
  568. elif retain_reason == RETAIN_REASON_RISING:
  569. patterns.append(PATTERN_RISING)
  570. elif retain_reason == RETAIN_REASON_SPIKE:
  571. patterns.append(PATTERN_SPIKE)
  572. return {
  573. "skipped": False,
  574. "skip_reason": None,
  575. "data_days": len(window_scores),
  576. "data_start_ymd": data_start_ymd,
  577. "data_end_ymd": data_end_ymd,
  578. "fetch_start_ymd": start_ymd,
  579. "fetch_end_ymd": end_ymd,
  580. "min_score": min(numeric_scores),
  581. "max_score": max(numeric_scores),
  582. "avg_score": sum(numeric_scores) / len(numeric_scores),
  583. "is_sustained_high": is_sustained_high,
  584. "is_rising": is_rising,
  585. "is_spike": is_spike,
  586. "retain_reason": retain_reason,
  587. "patterns": patterns,
  588. "scores": window_scores,
  589. }
  590. def build_wxindex_word_record_init_payload(
  591. *,
  592. meta: dict[str, Any],
  593. name: str,
  594. analyze_ymd: str,
  595. fetch_start_ymd: str,
  596. fetch_end_ymd: str,
  597. demand_cache_run_id: int | None = None,
  598. ) -> dict[str, Any]:
  599. """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
  600. return {
  601. "name": name,
  602. "meta_id": meta.get("id"),
  603. "analyze_ymd": analyze_ymd,
  604. "fetch_start_ymd": fetch_start_ymd,
  605. "fetch_end_ymd": fetch_end_ymd,
  606. "demand_cache_run_id": demand_cache_run_id,
  607. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  608. }
  609. def build_wxindex_word_record_skipped_payload(
  610. *,
  611. meta: dict[str, Any],
  612. name: str,
  613. analyze_ymd: str,
  614. fetch_start_ymd: str,
  615. fetch_end_ymd: str,
  616. result: dict[str, Any],
  617. demand_cache_run_id: int | None = None,
  618. ) -> dict[str, Any]:
  619. """分析被跳过时更新追溯记录。"""
  620. payload = {
  621. "name": name,
  622. "meta_id": meta.get("id"),
  623. "analyze_ymd": analyze_ymd,
  624. "fetch_start_ymd": fetch_start_ymd,
  625. "fetch_end_ymd": fetch_end_ymd,
  626. "demand_cache_run_id": demand_cache_run_id,
  627. **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
  628. }
  629. payload.update(
  630. {
  631. "data_start_ymd": result.get("data_start_ymd"),
  632. "data_end_ymd": result.get("data_end_ymd"),
  633. "data_days": result.get("data_days"),
  634. "detail_json": {
  635. "phase": "analysis_skipped",
  636. "skip_reason": result.get("skip_reason"),
  637. "data_days": result.get("data_days"),
  638. },
  639. }
  640. )
  641. return payload
  642. def build_wxindex_word_record_payload(
  643. item: dict[str, Any],
  644. *,
  645. analyze_ymd: str,
  646. demand_cache_run_id: int | None,
  647. phase: str,
  648. senior_threshold: float,
  649. sustained_threshold: float,
  650. spike_days: int,
  651. spike_ratio: float,
  652. spike_baseline_floor: float,
  653. rising_overall_change_rate: float,
  654. rising_window_change_rate: float,
  655. rising_adjacent_up_ratio: float,
  656. ) -> dict[str, Any]:
  657. """根据 pending item 当前状态构建 records 表 payload。"""
  658. meta = item["meta"]
  659. name = item["name"]
  660. fetch_start_ymd = item["fetch_start_ymd"]
  661. fetch_end_ymd = item["fetch_end_ymd"]
  662. result = item["result"]
  663. retain_reason = item.get("retain_reason")
  664. return {
  665. "name": name,
  666. "meta_id": meta.get("id"),
  667. "analyze_ymd": analyze_ymd,
  668. "fetch_start_ymd": fetch_start_ymd,
  669. "fetch_end_ymd": fetch_end_ymd,
  670. "data_start_ymd": result.get("data_start_ymd"),
  671. "data_end_ymd": result.get("data_end_ymd"),
  672. "data_days": result.get("data_days"),
  673. "is_sustained_high": result.get("is_sustained_high"),
  674. "is_rising": result.get("is_rising"),
  675. "is_spike": result.get("is_spike"),
  676. "retain_reason": retain_reason,
  677. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  678. "matched_demand": item.get("matched_demand"),
  679. "demand_cache_run_id": demand_cache_run_id,
  680. "internal_demand_match_json": item.get("internal_demand_match_json"),
  681. "senior_fit_score": item.get("senior_fit_score"),
  682. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  683. "is_final_retained": item.get("is_final_retained"),
  684. "min_score": result.get("min_score"),
  685. "max_score": result.get("max_score"),
  686. "avg_score": result.get("avg_score"),
  687. "detail_json": {
  688. "phase": phase,
  689. "patterns": list(result.get("patterns") or []),
  690. "retain_reason": retain_reason,
  691. "retain_reason_priority": "2->3->1",
  692. "senior_fit_threshold": senior_threshold,
  693. "senior_fit_threshold_score": senior_threshold * 10.0,
  694. "is_final_retained": item.get("is_final_retained"),
  695. "sustained_threshold": sustained_threshold,
  696. "spike_days": spike_days,
  697. "spike_ratio": spike_ratio,
  698. "spike_baseline_floor": spike_baseline_floor,
  699. "rising_overall_change_rate": rising_overall_change_rate,
  700. "rising_window_change_rate": rising_window_change_rate,
  701. "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
  702. },
  703. }
  704. def _persist_wxindex_word_record(
  705. repository: HotContentRepository,
  706. payload: dict[str, Any],
  707. *,
  708. dry_run: bool,
  709. skip_db_save: bool,
  710. verbose: bool,
  711. action: str,
  712. ) -> int:
  713. name = str(payload.get("name") or "").strip()
  714. if dry_run or skip_db_save:
  715. if verbose:
  716. label = "dry-run" if dry_run else "skip-db-save"
  717. print(f"[{label}] would {action} wxindex word record word={name}")
  718. return 0
  719. return repository.save_wxindex_word_record(payload)
  720. def _persist_pending_item_record(
  721. repository: HotContentRepository,
  722. item: dict[str, Any],
  723. *,
  724. analyze_ymd: str,
  725. demand_cache_run_id: int | None,
  726. phase: str,
  727. senior_threshold: float,
  728. sustained_threshold: float,
  729. spike_days: int,
  730. spike_ratio: float,
  731. spike_baseline_floor: float,
  732. rising_overall_change_rate: float,
  733. rising_window_change_rate: float,
  734. rising_adjacent_up_ratio: float,
  735. dry_run: bool,
  736. skip_db_save: bool,
  737. verbose: bool,
  738. ) -> int:
  739. payload = build_wxindex_word_record_payload(
  740. item,
  741. analyze_ymd=analyze_ymd,
  742. demand_cache_run_id=demand_cache_run_id,
  743. phase=phase,
  744. senior_threshold=senior_threshold,
  745. sustained_threshold=sustained_threshold,
  746. spike_days=spike_days,
  747. spike_ratio=spike_ratio,
  748. spike_baseline_floor=spike_baseline_floor,
  749. rising_overall_change_rate=rising_overall_change_rate,
  750. rising_window_change_rate=rising_window_change_rate,
  751. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  752. )
  753. record_id = _persist_wxindex_word_record(
  754. repository,
  755. payload,
  756. dry_run=dry_run,
  757. skip_db_save=skip_db_save,
  758. verbose=verbose,
  759. action=f"{phase} word={item['name']}",
  760. )
  761. if record_id:
  762. item["record_id"] = record_id
  763. return int(item.get("record_id") or 0)
  764. def _filter_candidates_awaiting_yesterday_score(
  765. repository: HotContentRepository,
  766. candidate_items: list[dict[str, Any]],
  767. *,
  768. yesterday_ymd: str,
  769. existing_records: dict[str, dict[str, Any]],
  770. verbose: bool,
  771. ) -> tuple[list[dict[str, Any]], int]:
  772. """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
  773. names_to_check = [
  774. item["name"]
  775. for item in candidate_items
  776. if not _is_heat_analysis_done(existing_records.get(item["name"]))
  777. ]
  778. if not names_to_check:
  779. return candidate_items, 0
  780. names_with_yesterday = repository.list_wxindex_word_names_with_dt(
  781. names_to_check,
  782. dt=yesterday_ymd,
  783. )
  784. ready_items: list[dict[str, Any]] = []
  785. awaiting_count = 0
  786. for item in candidate_items:
  787. name = item["name"]
  788. if _is_heat_analysis_done(existing_records.get(name)):
  789. ready_items.append(item)
  790. continue
  791. if name in names_with_yesterday:
  792. ready_items.append(item)
  793. continue
  794. awaiting_count += 1
  795. if verbose:
  796. print(
  797. f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
  798. )
  799. return ready_items, awaiting_count
  800. def _init_candidate_wxindex_word_records(
  801. repository: HotContentRepository,
  802. candidate_items: list[dict[str, Any]],
  803. *,
  804. analyze_ymd: str,
  805. demand_cache_run_id: int | None,
  806. dry_run: bool,
  807. skip_db_save: bool,
  808. verbose: bool,
  809. ) -> None:
  810. if not candidate_items:
  811. return
  812. if dry_run or skip_db_save:
  813. if verbose:
  814. label = "dry-run" if dry_run else "skip-db-save"
  815. print(
  816. f"[{label}] would batch init wxindex word records "
  817. f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
  818. )
  819. return
  820. init_payloads = [
  821. build_wxindex_word_record_init_payload(
  822. meta=item["meta"],
  823. name=item["name"],
  824. analyze_ymd=analyze_ymd,
  825. fetch_start_ymd=item["fetch_start_ymd"],
  826. fetch_end_ymd=item["fetch_end_ymd"],
  827. demand_cache_run_id=demand_cache_run_id,
  828. )
  829. for item in candidate_items
  830. ]
  831. record_id_map = repository.init_wxindex_word_records(init_payloads)
  832. for item in candidate_items:
  833. item["record_id"] = int(record_id_map.get(item["name"]) or 0)
  834. if verbose:
  835. print(
  836. f"init wxindex word records count={len(candidate_items)} "
  837. f"analyze_ymd={analyze_ymd}"
  838. )
  839. def _apply_demand_match_to_item(
  840. item: dict[str, Any],
  841. match_result: dict[str, Any],
  842. ) -> None:
  843. item["internal_demand_match_json"] = match_result
  844. item["is_internal_demand_matched"] = bool(match_result.get("matched"))
  845. matched_demand = str(match_result.get("matched_demand") or "").strip()
  846. item["matched_demand"] = matched_demand or None
  847. if not item.get("is_internal_demand_matched"):
  848. item["is_final_retained"] = False
  849. def _apply_senior_fit_to_item(
  850. item: dict[str, Any],
  851. senior_result: dict[str, Any],
  852. ) -> None:
  853. passed = bool(senior_result.get("passed"))
  854. item["senior_fit_score"] = senior_result.get("senior_fit_score")
  855. item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
  856. item["senior_fit_passed"] = passed
  857. item["is_final_retained"] = passed
  858. def build_wxindex_word_stats_payload(
  859. item: dict[str, Any],
  860. *,
  861. analyze_ymd: str,
  862. ) -> dict[str, Any]:
  863. """构建通过热度+老年性筛选的词统计 payload。"""
  864. meta = item["meta"]
  865. result = item["result"]
  866. return {
  867. "name": item["name"],
  868. "meta_id": meta.get("id"),
  869. "analyze_ymd": analyze_ymd,
  870. "wxindex_word_record_id": item.get("record_id"),
  871. "retain_reason": item.get("retain_reason"),
  872. "senior_fit_score": item.get("senior_fit_score"),
  873. "data_start_ymd": result.get("data_start_ymd"),
  874. "data_end_ymd": result.get("data_end_ymd"),
  875. "data_days": result.get("data_days"),
  876. "min_score": result.get("min_score"),
  877. "max_score": result.get("max_score"),
  878. "avg_score": result.get("avg_score"),
  879. "detail_json": {
  880. "demand_senior_fit_json": item.get("demand_senior_fit_json"),
  881. "is_sustained_high": result.get("is_sustained_high"),
  882. "is_rising": result.get("is_rising"),
  883. "is_spike": result.get("is_spike"),
  884. },
  885. }
  886. def _save_senior_fit_passed_stats(
  887. repository: HotContentRepository,
  888. items: list[dict[str, Any]],
  889. *,
  890. analyze_ymd: str,
  891. existing_stats_names: set[str] | None = None,
  892. dry_run: bool,
  893. skip_db_save: bool,
  894. verbose: bool,
  895. ) -> tuple[int, int]:
  896. existing = existing_stats_names or set()
  897. all_payloads = [
  898. build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
  899. for item in items
  900. if item.get("senior_fit_passed")
  901. ]
  902. resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
  903. payloads = [payload for payload in all_payloads if payload["name"] not in existing]
  904. if not payloads:
  905. return 0, resumed
  906. if dry_run or skip_db_save:
  907. if verbose:
  908. label = "dry-run" if dry_run else "skip-db-save"
  909. print(f"[{label}] would save wxindex word stats count={len(payloads)}")
  910. return len(payloads), resumed
  911. saved = repository.save_wxindex_word_stats_batch(payloads)
  912. if verbose:
  913. print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
  914. return saved, resumed
  915. def _build_export_row_from_item(
  916. item: dict[str, Any],
  917. *,
  918. analyze_ymd: str,
  919. strategy: str,
  920. ) -> dict[str, Any]:
  921. meta = item["meta"]
  922. name = item["name"]
  923. result = item["result"]
  924. retain_reason = item.get("retain_reason")
  925. is_internal_demand_matched = item.get("is_internal_demand_matched")
  926. matched_demand = str(item.get("matched_demand") or "")
  927. senior_fit_score = item.get("senior_fit_score")
  928. is_final_retained = bool(item.get("is_final_retained"))
  929. return {
  930. "analyze_ymd": analyze_ymd,
  931. "name": name,
  932. "meta_id": meta.get("id"),
  933. "fetch_start_ymd": item["fetch_start_ymd"],
  934. "fetch_end_ymd": item["fetch_end_ymd"],
  935. "data_start_ymd": result.get("data_start_ymd"),
  936. "data_end_ymd": result.get("data_end_ymd"),
  937. "data_days": result.get("data_days"),
  938. "analysis_skipped": False,
  939. "skip_reason": "",
  940. "is_sustained_high": bool(result.get("is_sustained_high")),
  941. "is_rising": bool(result.get("is_rising")),
  942. "is_spike": bool(result.get("is_spike")),
  943. "retain_reason": retain_reason or "",
  944. "is_internal_demand_matched": (
  945. "" if is_internal_demand_matched is None else is_internal_demand_matched
  946. ),
  947. "matched_demand": matched_demand,
  948. "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
  949. "is_final_retained": is_final_retained,
  950. "min_score": result.get("min_score"),
  951. "max_score": result.get("max_score"),
  952. "avg_score": result.get("avg_score"),
  953. "demand_id": (
  954. build_demand_id(
  955. strategy=strategy,
  956. demand_name=name,
  957. partition_dt=analyze_ymd,
  958. )
  959. if is_final_retained and strategy
  960. else ""
  961. ),
  962. "weight": (
  963. float(result.get("max_score") or 0) / WEIGHT_DIVISOR
  964. if is_final_retained and result.get("max_score") is not None
  965. else ""
  966. ),
  967. }
  968. def run_wxindex_heat_pattern_daily_job(
  969. repository: HotContentRepository,
  970. *,
  971. config: FlowConfig | None = None,
  972. api_client: JsonApiClient | None = None,
  973. today: date | None = None,
  974. sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
  975. min_days: int = WXINDEX_HEAT_MIN_DAYS,
  976. max_days: int = WXINDEX_HEAT_MAX_DAYS,
  977. spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
  978. spike_ratio: float = WXINDEX_SPIKE_RATIO,
  979. spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
  980. rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
  981. rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
  982. rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
  983. dry_run: bool = False,
  984. skip_odps: bool = False,
  985. skip_db_save: bool = False,
  986. verbose: bool = False,
  987. ) -> dict[str, Any]:
  988. """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
  989. current = today or datetime.now(SHANGHAI_TZ).date()
  990. analyze_ymd = current.strftime("%Y%m%d")
  991. meta_rows = repository.list_active_wxindex_word_meta(today=current)
  992. demand_name_set: list[str] = []
  993. demand_cache_run_id: int | None = None
  994. demand_cache_error: str | None = None
  995. postprocess_service: ContributionPostprocessService | None = None
  996. if config is not None:
  997. try:
  998. cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
  999. demand_name_set = list(cache.get("demand_name_set") or [])
  1000. demand_cache_run_id = int(cache["id"])
  1001. if demand_name_set:
  1002. postprocess_service = ContributionPostprocessService(
  1003. config,
  1004. repository,
  1005. api_client
  1006. or JsonApiClient(
  1007. timeout_seconds=config.request_timeout_seconds,
  1008. verify_ssl=config.https_verify_ssl,
  1009. ),
  1010. )
  1011. except HotContentFlowError as exc:
  1012. demand_cache_error = str(exc)
  1013. if verbose:
  1014. print(f"demand cache unavailable, skip demand match: {exc}")
  1015. summary: dict[str, Any] = {
  1016. "analyze_ymd": analyze_ymd,
  1017. "meta_count": len(meta_rows),
  1018. "analyzed": 0,
  1019. "skipped": 0,
  1020. "retained": 0,
  1021. "sustained_high": 0,
  1022. "rising": 0,
  1023. "spike": 0,
  1024. "internal_demand_matched": 0,
  1025. "senior_scored": 0,
  1026. "senior_fit_candidates": 0,
  1027. "senior_fit_passed": 0,
  1028. "stats_saved": 0,
  1029. "final_retained": 0,
  1030. "odps_synced": 0,
  1031. "odps_written": 0,
  1032. "demand_cache_run_id": demand_cache_run_id,
  1033. "demand_cache_error": demand_cache_error,
  1034. "demand_name_count": len(demand_name_set),
  1035. "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
  1036. "demand_match_batches": 0,
  1037. "senior_fit_batches": 0,
  1038. "records_initialized": 0,
  1039. "awaiting_yesterday_score": 0,
  1040. "heat_resumed": 0,
  1041. "demand_match_resumed": 0,
  1042. "senior_fit_resumed": 0,
  1043. "stats_resumed": 0,
  1044. "finalized_resumed": 0,
  1045. "dry_run": dry_run,
  1046. "skip_odps": skip_odps,
  1047. "skip_db_save": skip_db_save,
  1048. }
  1049. retained_words: list[dict[str, Any]] = []
  1050. export_rows: list[dict[str, Any]] = []
  1051. final_hive_rows: list[dict[str, Any]] = []
  1052. senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
  1053. strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
  1054. pending_items: list[dict[str, Any]] = []
  1055. candidate_items: list[dict[str, Any]] = []
  1056. for meta in meta_rows:
  1057. name = str(meta.get("name") or "").strip()
  1058. fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
  1059. fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
  1060. if not name or not fetch_start_ymd or not fetch_end_ymd:
  1061. summary["skipped"] += 1
  1062. continue
  1063. candidate_items.append(
  1064. {
  1065. "meta": meta,
  1066. "name": name,
  1067. "fetch_start_ymd": fetch_start_ymd,
  1068. "fetch_end_ymd": fetch_end_ymd,
  1069. }
  1070. )
  1071. _init_candidate_wxindex_word_records(
  1072. repository,
  1073. candidate_items,
  1074. analyze_ymd=analyze_ymd,
  1075. demand_cache_run_id=demand_cache_run_id,
  1076. dry_run=dry_run,
  1077. skip_db_save=skip_db_save,
  1078. verbose=verbose,
  1079. )
  1080. summary["records_initialized"] = len(candidate_items)
  1081. existing_records: dict[str, dict[str, Any]] = {}
  1082. existing_stats_names: set[str] = set()
  1083. if not dry_run and not skip_db_save:
  1084. candidate_names = [item["name"] for item in candidate_items]
  1085. existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
  1086. analyze_ymd=analyze_ymd,
  1087. names=candidate_names,
  1088. )
  1089. existing_stats_names = repository.list_wxindex_word_stats_names(
  1090. analyze_ymd=analyze_ymd,
  1091. names=candidate_names,
  1092. )
  1093. yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
  1094. candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
  1095. repository,
  1096. candidate_items,
  1097. yesterday_ymd=yesterday_ymd,
  1098. existing_records=existing_records,
  1099. verbose=verbose,
  1100. )
  1101. summary["awaiting_yesterday_score"] = awaiting_yesterday
  1102. for item in candidate_items:
  1103. if not item.get("record_id"):
  1104. existing = existing_records.get(item["name"])
  1105. if existing:
  1106. item["record_id"] = int(existing.get("id") or 0)
  1107. for item in candidate_items:
  1108. meta = item["meta"]
  1109. name = item["name"]
  1110. fetch_start_ymd = item["fetch_start_ymd"]
  1111. fetch_end_ymd = item["fetch_end_ymd"]
  1112. existing_record = existing_records.get(name)
  1113. scores = repository.list_wxindex_word_scores_in_range(
  1114. name,
  1115. start_ymd=fetch_start_ymd,
  1116. end_ymd=fetch_end_ymd,
  1117. )
  1118. if (
  1119. _is_heat_analysis_done(existing_record)
  1120. and not _should_rerun_heat_analysis(
  1121. existing_record,
  1122. scores=scores,
  1123. fetch_start_ymd=fetch_start_ymd,
  1124. fetch_end_ymd=fetch_end_ymd,
  1125. min_days=min_days,
  1126. max_days=max_days,
  1127. )
  1128. ):
  1129. summary["heat_resumed"] += 1
  1130. if _is_analysis_skipped_record(existing_record):
  1131. if verbose:
  1132. detail = _parse_record_detail_json(existing_record)
  1133. print(
  1134. f"resume skip word={name} reason={detail.get('skip_reason')} "
  1135. f"days={existing_record.get('data_days')}"
  1136. )
  1137. export_rows.append(
  1138. _build_skipped_export_row_from_record(
  1139. record=existing_record,
  1140. meta=meta,
  1141. name=name,
  1142. analyze_ymd=analyze_ymd,
  1143. fetch_start_ymd=fetch_start_ymd,
  1144. fetch_end_ymd=fetch_end_ymd,
  1145. )
  1146. )
  1147. continue
  1148. pending_item = _rehydrate_pending_item_from_record(
  1149. item,
  1150. existing_record,
  1151. senior_threshold=senior_threshold,
  1152. )
  1153. pending_items.append(pending_item)
  1154. if verbose:
  1155. print(
  1156. f"resume heat analyzed word={name} "
  1157. f"retain_reason={pending_item.get('retain_reason') or ''} "
  1158. f"data_days={pending_item['result'].get('data_days')}"
  1159. )
  1160. continue
  1161. result = analyze_wxindex_heat_patterns(
  1162. scores,
  1163. start_ymd=fetch_start_ymd,
  1164. end_ymd=fetch_end_ymd,
  1165. sustained_threshold=sustained_threshold,
  1166. min_days=min_days,
  1167. max_days=max_days,
  1168. spike_days=spike_days,
  1169. spike_ratio=spike_ratio,
  1170. spike_baseline_floor=spike_baseline_floor,
  1171. rising_overall_change_rate=rising_overall_change_rate,
  1172. rising_window_change_rate=rising_window_change_rate,
  1173. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1174. )
  1175. if result.get("skipped"):
  1176. if verbose:
  1177. print(
  1178. f"skip word={name} reason={result.get('skip_reason')} "
  1179. f"days={result.get('data_days')}"
  1180. )
  1181. skipped_payload = build_wxindex_word_record_skipped_payload(
  1182. meta=meta,
  1183. name=name,
  1184. analyze_ymd=analyze_ymd,
  1185. fetch_start_ymd=fetch_start_ymd,
  1186. fetch_end_ymd=fetch_end_ymd,
  1187. result=result,
  1188. demand_cache_run_id=demand_cache_run_id,
  1189. )
  1190. _persist_wxindex_word_record(
  1191. repository,
  1192. skipped_payload,
  1193. dry_run=dry_run,
  1194. skip_db_save=skip_db_save,
  1195. verbose=verbose,
  1196. action="update skipped",
  1197. )
  1198. export_rows.append(
  1199. {
  1200. "analyze_ymd": analyze_ymd,
  1201. "name": name,
  1202. "meta_id": meta.get("id"),
  1203. "fetch_start_ymd": fetch_start_ymd,
  1204. "fetch_end_ymd": fetch_end_ymd,
  1205. "analysis_skipped": True,
  1206. "skip_reason": result.get("skip_reason"),
  1207. "data_days": result.get("data_days"),
  1208. "retain_reason": "",
  1209. "is_sustained_high": False,
  1210. "is_rising": False,
  1211. "is_spike": False,
  1212. "is_internal_demand_matched": "",
  1213. "matched_demand": "",
  1214. "senior_fit_score": "",
  1215. "is_final_retained": False,
  1216. "min_score": "",
  1217. "max_score": "",
  1218. "avg_score": "",
  1219. }
  1220. )
  1221. continue
  1222. retain_reason = str(result.get("retain_reason") or "").strip() or None
  1223. pending_item = {
  1224. "meta": meta,
  1225. "name": name,
  1226. "fetch_start_ymd": fetch_start_ymd,
  1227. "fetch_end_ymd": fetch_end_ymd,
  1228. "result": result,
  1229. "retain_reason": retain_reason,
  1230. "record_id": item.get("record_id", 0),
  1231. "is_internal_demand_matched": None,
  1232. "matched_demand": None,
  1233. "internal_demand_match_json": None,
  1234. "senior_fit_score": None,
  1235. "demand_senior_fit_json": None,
  1236. "senior_fit_passed": None,
  1237. "is_final_retained": None,
  1238. }
  1239. pending_items.append(pending_item)
  1240. _persist_pending_item_record(
  1241. repository,
  1242. pending_item,
  1243. analyze_ymd=analyze_ymd,
  1244. demand_cache_run_id=demand_cache_run_id,
  1245. phase="heat_analyzed",
  1246. senior_threshold=senior_threshold,
  1247. sustained_threshold=sustained_threshold,
  1248. spike_days=spike_days,
  1249. spike_ratio=spike_ratio,
  1250. spike_baseline_floor=spike_baseline_floor,
  1251. rising_overall_change_rate=rising_overall_change_rate,
  1252. rising_window_change_rate=rising_window_change_rate,
  1253. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1254. dry_run=dry_run,
  1255. skip_db_save=skip_db_save,
  1256. verbose=verbose,
  1257. )
  1258. if verbose:
  1259. print(
  1260. f"heat analyzed word={name} retain_reason={retain_reason or ''} "
  1261. f"data_days={result.get('data_days')}"
  1262. )
  1263. demand_match_results: dict[str, dict[str, Any]] = {}
  1264. pending_by_name = {item["name"]: item for item in pending_items}
  1265. sustained_high_words = [
  1266. item["name"]
  1267. for item in pending_items
  1268. if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
  1269. and item.get("is_internal_demand_matched") is None
  1270. ]
  1271. for item in pending_items:
  1272. if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
  1273. continue
  1274. if item.get("is_internal_demand_matched") is not None:
  1275. summary["demand_match_resumed"] += 1
  1276. def _persist_demand_match_updates(words: list[str]) -> None:
  1277. for word in words:
  1278. pending_item = pending_by_name.get(word)
  1279. if pending_item is None:
  1280. continue
  1281. match_result = demand_match_results.get(word) or {
  1282. "word": word,
  1283. "matched": False,
  1284. "matched_demand": "",
  1285. "match_list": [],
  1286. }
  1287. _apply_demand_match_to_item(pending_item, match_result)
  1288. _persist_pending_item_record(
  1289. repository,
  1290. pending_item,
  1291. analyze_ymd=analyze_ymd,
  1292. demand_cache_run_id=demand_cache_run_id,
  1293. phase="demand_matched",
  1294. senior_threshold=senior_threshold,
  1295. sustained_threshold=sustained_threshold,
  1296. spike_days=spike_days,
  1297. spike_ratio=spike_ratio,
  1298. spike_baseline_floor=spike_baseline_floor,
  1299. rising_overall_change_rate=rising_overall_change_rate,
  1300. rising_window_change_rate=rising_window_change_rate,
  1301. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1302. dry_run=dry_run,
  1303. skip_db_save=skip_db_save,
  1304. verbose=verbose,
  1305. )
  1306. if sustained_high_words:
  1307. if postprocess_service is not None and demand_name_set:
  1308. for batch in _chunk_words(sustained_high_words):
  1309. summary["demand_match_batches"] += 1
  1310. if verbose:
  1311. print(f"demand match batch size={len(batch)} words={batch}")
  1312. demand_match_results.update(
  1313. postprocess_service.match_words_to_demand_pool(
  1314. words=batch,
  1315. demand_name_set=demand_name_set,
  1316. )
  1317. )
  1318. _persist_demand_match_updates(batch)
  1319. else:
  1320. for word in sustained_high_words:
  1321. demand_match_results[word] = {
  1322. "word": word,
  1323. "matched": False,
  1324. "matched_demand": "",
  1325. "match_list": [],
  1326. "skip_reason": "empty_demand_cache",
  1327. }
  1328. _persist_demand_match_updates(sustained_high_words)
  1329. senior_fit_candidate_names: list[str] = []
  1330. for item in pending_items:
  1331. retain_reason = item.get("retain_reason")
  1332. name = str(item.get("name") or "").strip()
  1333. if not retain_reason or not name:
  1334. continue
  1335. if _is_senior_fit_attempt_done(item, existing_records.get(name)):
  1336. summary["senior_fit_resumed"] += 1
  1337. continue
  1338. if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
  1339. if not item.get("is_internal_demand_matched"):
  1340. continue
  1341. senior_fit_candidate_names.append(name)
  1342. senior_fit_results: dict[str, dict[str, Any]] = {}
  1343. def _persist_senior_fit_updates(words: list[str]) -> None:
  1344. for word in words:
  1345. pending_item = pending_by_name.get(word)
  1346. senior_result = senior_fit_results.get(word)
  1347. if pending_item is None or senior_result is None:
  1348. continue
  1349. _apply_senior_fit_to_item(pending_item, senior_result)
  1350. _persist_pending_item_record(
  1351. repository,
  1352. pending_item,
  1353. analyze_ymd=analyze_ymd,
  1354. demand_cache_run_id=demand_cache_run_id,
  1355. phase="senior_fit_scored",
  1356. senior_threshold=senior_threshold,
  1357. sustained_threshold=sustained_threshold,
  1358. spike_days=spike_days,
  1359. spike_ratio=spike_ratio,
  1360. spike_baseline_floor=spike_baseline_floor,
  1361. rising_overall_change_rate=rising_overall_change_rate,
  1362. rising_window_change_rate=rising_window_change_rate,
  1363. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1364. dry_run=dry_run,
  1365. skip_db_save=skip_db_save,
  1366. verbose=verbose,
  1367. )
  1368. if verbose and not pending_item.get("senior_fit_passed"):
  1369. print(
  1370. f"senior fit rejected word={word} "
  1371. f"score={pending_item.get('senior_fit_score')}"
  1372. )
  1373. if config is not None and senior_fit_candidate_names:
  1374. for batch in _chunk_words(senior_fit_candidate_names):
  1375. summary["senior_fit_batches"] += 1
  1376. if verbose:
  1377. print(f"senior fit batch size={len(batch)} words={batch}")
  1378. senior_fit_results.update(
  1379. score_wxindex_words_senior_fit(
  1380. words=batch,
  1381. config=config,
  1382. senior_threshold=senior_threshold,
  1383. batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
  1384. )
  1385. )
  1386. _persist_senior_fit_updates(batch)
  1387. stats_saved, stats_resumed = _save_senior_fit_passed_stats(
  1388. repository,
  1389. pending_items,
  1390. analyze_ymd=analyze_ymd,
  1391. existing_stats_names=existing_stats_names,
  1392. dry_run=dry_run,
  1393. skip_db_save=skip_db_save,
  1394. verbose=verbose,
  1395. )
  1396. summary["stats_saved"] = stats_saved
  1397. summary["stats_resumed"] = stats_resumed
  1398. for item in pending_items:
  1399. name = item["name"]
  1400. result = item["result"]
  1401. retain_reason = item.get("retain_reason")
  1402. if retain_reason:
  1403. retained_words.append(
  1404. {
  1405. "name": name,
  1406. "retain_reason": retain_reason,
  1407. "data_days": result.get("data_days"),
  1408. "data_start_ymd": result.get("data_start_ymd"),
  1409. "data_end_ymd": result.get("data_end_ymd"),
  1410. "is_internal_demand_matched": item.get("is_internal_demand_matched"),
  1411. "matched_demand": str(item.get("matched_demand") or ""),
  1412. "senior_fit_score": item.get("senior_fit_score"),
  1413. "is_final_retained": bool(item.get("is_final_retained")),
  1414. }
  1415. )
  1416. existing_record = existing_records.get(name)
  1417. if _is_finalized_record(existing_record):
  1418. summary["finalized_resumed"] += 1
  1419. record_id = int(item.get("record_id") or existing_record.get("id") or 0)
  1420. if verbose:
  1421. print(f"resume finalized word={name}")
  1422. else:
  1423. record_id = _persist_pending_item_record(
  1424. repository,
  1425. item,
  1426. analyze_ymd=analyze_ymd,
  1427. demand_cache_run_id=demand_cache_run_id,
  1428. phase="finalized",
  1429. senior_threshold=senior_threshold,
  1430. sustained_threshold=sustained_threshold,
  1431. spike_days=spike_days,
  1432. spike_ratio=spike_ratio,
  1433. spike_baseline_floor=spike_baseline_floor,
  1434. rising_overall_change_rate=rising_overall_change_rate,
  1435. rising_window_change_rate=rising_window_change_rate,
  1436. rising_adjacent_up_ratio=rising_adjacent_up_ratio,
  1437. dry_run=dry_run,
  1438. skip_db_save=skip_db_save,
  1439. verbose=verbose,
  1440. )
  1441. export_rows.append(
  1442. _build_export_row_from_item(
  1443. item,
  1444. analyze_ymd=analyze_ymd,
  1445. strategy=strategy,
  1446. )
  1447. )
  1448. if item.get("is_final_retained") and strategy:
  1449. final_hive_rows.append(
  1450. build_wxindex_word_hive_row(
  1451. wxindex_word_record_id=record_id,
  1452. word=name,
  1453. strategy=strategy,
  1454. partition_dt=analyze_ymd,
  1455. max_score=result.get("max_score"),
  1456. retain_reason=item.get("retain_reason"),
  1457. )
  1458. )
  1459. if (
  1460. final_hive_rows
  1461. and strategy
  1462. and config is not None
  1463. and not dry_run
  1464. and not skip_odps
  1465. ):
  1466. odps_summary = sync_wxindex_word_rows_to_odps(
  1467. config,
  1468. repository,
  1469. hive_rows=final_hive_rows,
  1470. partition_dt=analyze_ymd,
  1471. strategy=strategy,
  1472. )
  1473. summary["odps_written"] = odps_summary.get("written_count", 0)
  1474. summary["odps_synced"] = odps_summary.get("odps_synced", 0)
  1475. summary["odps_sync"] = odps_summary
  1476. elif dry_run or skip_odps:
  1477. summary["odps_written"] = len(final_hive_rows)
  1478. summary["odps_synced"] = len(final_hive_rows)
  1479. _refresh_wxindex_heat_job_summary(
  1480. summary,
  1481. pending_items=pending_items,
  1482. export_rows=export_rows,
  1483. )
  1484. summary["retained_words"] = retained_words
  1485. summary["export_rows"] = export_rows
  1486. return summary
  1487. WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
  1488. "analyze_ymd",
  1489. "name",
  1490. "meta_id",
  1491. "fetch_start_ymd",
  1492. "fetch_end_ymd",
  1493. "data_start_ymd",
  1494. "data_end_ymd",
  1495. "data_days",
  1496. "analysis_skipped",
  1497. "skip_reason",
  1498. "is_sustained_high",
  1499. "is_rising",
  1500. "is_spike",
  1501. "retain_reason",
  1502. "is_internal_demand_matched",
  1503. "matched_demand",
  1504. "senior_fit_score",
  1505. "is_final_retained",
  1506. "min_score",
  1507. "max_score",
  1508. "avg_score",
  1509. "demand_id",
  1510. "weight",
  1511. ]
  1512. def write_wxindex_heat_pattern_csv(
  1513. rows: list[dict[str, Any]],
  1514. output_path: str | Path,
  1515. *,
  1516. fieldnames: list[str] | None = None,
  1517. ) -> Path:
  1518. """将热度分析明细写入本地 CSV。"""
  1519. path = Path(output_path).expanduser()
  1520. if not path.is_absolute():
  1521. path = Path.cwd() / path
  1522. path.parent.mkdir(parents=True, exist_ok=True)
  1523. columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
  1524. with path.open("w", encoding="utf-8-sig", newline="") as handle:
  1525. writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
  1526. writer.writeheader()
  1527. for row in rows:
  1528. writer.writerow({column: row.get(column, "") for column in columns})
  1529. return path