| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665 |
- """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
- from __future__ import annotations
- import csv
- import json
- from datetime import date, datetime, timedelta
- from pathlib import Path
- from typing import Any
- from app.hot_content.client import JsonApiClient
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
- from app.hot_content.demand_quality import (
- TYPE_PHRASE,
- llm_score_senior_fit,
- lookup_quality_scores,
- )
- from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.postprocess_service import ContributionPostprocessService
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.wxindex_trend import (
- HEAT_RISING_ADJACENT_UP_RATIO,
- HEAT_RISING_OVERALL_CHANGE_RATE,
- HEAT_RISING_WINDOW_CHANGE_RATE,
- HEAT_SPIKE_BASELINE_FLOOR,
- HEAT_SPIKE_RATIO,
- extract_sorted_scores,
- is_wxindex_heat_rising_scores,
- is_wxindex_spike_scores,
- )
- from app.hot_content.wxindex_words import filter_scores_in_ymd_window
- WXINDEX_HEAT_MIN_DAYS = 7
- WXINDEX_HEAT_MAX_DAYS = 14
- WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
- WXINDEX_SPIKE_LOOKBACK_DAYS = 3
- WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
- WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
- WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
- WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
- WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
- WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
- WXINDEX_WORD_LLM_BATCH_SIZE = 10
- PATTERN_SUSTAINED_HIGH = "sustained_high"
- PATTERN_RISING = "rising"
- PATTERN_SPIKE = "spike"
- RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
- RETAIN_REASON_RISING = "热度持续上涨"
- RETAIN_REASON_SPIKE = "热度突然暴涨"
- PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
- PHASE_HEAT_ANALYZED = "heat_analyzed"
- PHASE_DEMAND_MATCHED = "demand_matched"
- PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
- PHASE_FINALIZED = "finalized"
- _HEAT_DONE_PHASES = frozenset(
- {
- PHASE_ANALYSIS_SKIPPED,
- PHASE_HEAT_ANALYZED,
- PHASE_DEMAND_MATCHED,
- PHASE_SENIOR_FIT_SCORED,
- PHASE_FINALIZED,
- }
- )
- _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
- "data_start_ymd": None,
- "data_end_ymd": None,
- "data_days": None,
- "is_sustained_high": None,
- "is_rising": None,
- "is_spike": None,
- "retain_reason": None,
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "is_final_retained": None,
- "min_score": None,
- "max_score": None,
- "avg_score": None,
- "detail_json": None,
- }
- def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
- detail = record.get("detail_json")
- if isinstance(detail, dict):
- return detail
- if isinstance(detail, str) and detail.strip():
- try:
- parsed = json.loads(detail)
- return parsed if isinstance(parsed, dict) else {}
- except json.JSONDecodeError:
- return {}
- return {}
- def _record_phase(record: dict[str, Any]) -> str:
- return str(_parse_record_detail_json(record).get("phase") or "").strip()
- def _nullable_bool_from_record(value: Any) -> bool | None:
- if value is None:
- return None
- return bool(value)
- def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
- value = record.get(field)
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- try:
- return json.loads(value)
- except json.JSONDecodeError:
- return None
- return value
- def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) in _HEAT_DONE_PHASES
- def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
- return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
- def _is_senior_fit_needed(
- *,
- retain_reason: str | None,
- is_internal_demand_matched: bool | None,
- ) -> bool:
- if not retain_reason:
- return False
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- return bool(is_internal_demand_matched)
- return True
- def _is_finalized_record(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) == PHASE_FINALIZED
- def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
- ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
- return max(ymds) if ymds else None
- def _should_rerun_heat_analysis(
- existing_record: dict[str, Any] | None,
- *,
- scores: list[dict[str, Any]],
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- min_days: int,
- max_days: int,
- ) -> bool:
- if not _is_heat_analysis_done(existing_record):
- return True
- assert existing_record is not None
- latest_ymd = _latest_score_ymd(scores)
- record_end = str(existing_record.get("data_end_ymd") or "").strip()
- if latest_ymd and record_end and latest_ymd > record_end:
- return True
- if _is_analysis_skipped_record(existing_record):
- _, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason is None:
- return True
- return False
- def _is_senior_fit_attempt_done(
- item: dict[str, Any],
- existing_record: dict[str, Any] | None,
- ) -> bool:
- if existing_record:
- phase = _record_phase(existing_record)
- if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
- return True
- return item.get("senior_fit_score") is not None
- def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
- if score is None:
- return False
- try:
- return float(score) / 10.0 > senior_threshold
- except (TypeError, ValueError):
- return False
- def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- retain_reason_raw = record.get("retain_reason")
- retain_reason = (
- str(retain_reason_raw).strip() if retain_reason_raw is not None else None
- ) or None
- def _bool_field(key: str) -> bool | None:
- value = record.get(key)
- if value is None:
- return None
- return bool(value)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": record.get("data_days"),
- "data_start_ymd": record.get("data_start_ymd"),
- "data_end_ymd": record.get("data_end_ymd"),
- "fetch_start_ymd": record.get("fetch_start_ymd"),
- "fetch_end_ymd": record.get("fetch_end_ymd"),
- "min_score": record.get("min_score"),
- "max_score": record.get("max_score"),
- "avg_score": record.get("avg_score"),
- "is_sustained_high": _bool_field("is_sustained_high"),
- "is_rising": _bool_field("is_rising"),
- "is_spike": _bool_field("is_spike"),
- "retain_reason": retain_reason,
- "patterns": list(detail.get("patterns") or []),
- }
- def _rehydrate_pending_item_from_record(
- item: dict[str, Any],
- record: dict[str, Any],
- *,
- senior_threshold: float,
- ) -> dict[str, Any]:
- result = _rehydrate_result_from_record(record)
- retain_reason = result.get("retain_reason")
- is_internal_demand_matched = _nullable_bool_from_record(
- record.get("is_internal_demand_matched")
- )
- matched_demand_raw = record.get("matched_demand")
- matched_demand = (
- str(matched_demand_raw).strip() if matched_demand_raw is not None else None
- ) or None
- senior_fit_score = record.get("senior_fit_score")
- senior_fit_passed: bool | None = None
- is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
- if senior_fit_score is not None:
- senior_fit_passed = _senior_fit_passed_from_score(
- senior_fit_score,
- senior_threshold=senior_threshold,
- )
- elif not _is_senior_fit_needed(
- retain_reason=retain_reason,
- is_internal_demand_matched=is_internal_demand_matched,
- ):
- senior_fit_passed = False
- if is_final_retained is None:
- is_final_retained = False
- return {
- "meta": item["meta"],
- "name": item["name"],
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "result": result,
- "retain_reason": retain_reason,
- "record_id": int(record.get("id") or item.get("record_id") or 0),
- "is_internal_demand_matched": is_internal_demand_matched,
- "matched_demand": matched_demand,
- "internal_demand_match_json": _parse_record_json_field(
- record,
- "internal_demand_match_json",
- ),
- "senior_fit_score": senior_fit_score,
- "demand_senior_fit_json": _parse_record_json_field(
- record,
- "demand_senior_fit_json",
- ),
- "senior_fit_passed": senior_fit_passed,
- "is_final_retained": is_final_retained,
- }
- def _build_skipped_export_row_from_record(
- *,
- record: dict[str, Any],
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- ) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": detail.get("skip_reason") or "",
- "data_days": record.get("data_days") or "",
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- def _refresh_wxindex_heat_job_summary(
- summary: dict[str, Any],
- *,
- pending_items: list[dict[str, Any]],
- export_rows: list[dict[str, Any]],
- ) -> None:
- summary["analyzed"] = len(pending_items)
- summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
- summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
- summary["sustained_high"] = sum(
- 1
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- )
- summary["rising"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
- )
- summary["spike"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
- )
- summary["internal_demand_matched"] = sum(
- 1 for item in pending_items if item.get("is_internal_demand_matched")
- )
- summary["senior_fit_candidates"] = sum(
- 1
- for item in pending_items
- if _is_senior_fit_needed(
- retain_reason=item.get("retain_reason"),
- is_internal_demand_matched=item.get("is_internal_demand_matched"),
- )
- )
- summary["senior_scored"] = sum(
- 1 for item in pending_items if item.get("senior_fit_score") is not None
- )
- summary["senior_fit_passed"] = sum(
- 1 for item in pending_items if item.get("senior_fit_passed")
- )
- summary["final_retained"] = sum(
- 1 for item in pending_items if item.get("is_final_retained")
- )
- def resolve_retain_reason(
- *,
- is_sustained_high: bool,
- is_rising: bool,
- is_spike: bool,
- ) -> str | None:
- """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
- if is_rising:
- return RETAIN_REASON_RISING
- if is_spike:
- return RETAIN_REASON_SPIKE
- if is_sustained_high:
- return RETAIN_REASON_SUSTAINED_HIGH
- return None
- def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
- size = max(batch_size, 1)
- return [words[index : index + size] for index in range(0, len(words), size)]
- def _empty_senior_fit_result() -> dict[str, Any]:
- return {
- "senior_fit_score": None,
- "demand_senior_fit_json": {"source": "", "items": []},
- "passed": False,
- }
- def score_wxindex_words_senior_fit(
- *,
- words: list[str],
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
- ) -> dict[str, dict[str, Any]]:
- """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
- cleaned: list[str] = []
- seen: set[str] = set()
- for raw in words:
- word = str(raw or "").strip()
- if not word or word in seen:
- continue
- seen.add(word)
- cleaned.append(word)
- results: dict[str, dict[str, Any]] = {
- word: _empty_senior_fit_result() for word in cleaned
- }
- for batch in _chunk_words(cleaned, batch_size=batch_size):
- candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
- senior_fit_json = llm_score_senior_fit(
- channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
- candidates=candidates,
- model=config.demand_quality_llm_model,
- max_attempts=config.demand_quality_llm_max_attempts,
- retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
- max_tokens=config.demand_quality_llm_max_tokens,
- )
- senior_fit_json["threshold"] = senior_threshold * 10.0
- for word in batch:
- _, senior_score = lookup_quality_scores(
- demand_type=TYPE_PHRASE,
- demand_text=word,
- event_sense_json=None,
- senior_fit_json=senior_fit_json,
- )
- passed = (
- senior_score is not None
- and senior_score / 10.0 > senior_threshold
- )
- results[word] = {
- "senior_fit_score": senior_score,
- "demand_senior_fit_json": senior_fit_json,
- "passed": passed,
- }
- return results
- def score_wxindex_word_senior_fit(
- *,
- word: str,
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- ) -> dict[str, Any]:
- """对单个微信指数热词执行老年性 LLM 评分。"""
- target_word = str(word or "").strip()
- if not target_word:
- return _empty_senior_fit_result()
- batch_results = score_wxindex_words_senior_fit(
- words=[target_word],
- config=config,
- senior_threshold=senior_threshold,
- batch_size=1,
- )
- return batch_results.get(target_word, _empty_senior_fit_result())
- def build_wxindex_odps_extend(retain_reason: str | None) -> str:
- method = str(retain_reason or "").strip()
- if not method:
- return "{}"
- return json.dumps({"method": method}, ensure_ascii=False)
- def build_wxindex_word_hive_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- retain_reason: str | None = None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = 0.0
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "record_id": wxindex_word_record_id,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "weight": weight,
- "type": TYPE_PHRASE,
- "video_count": None,
- "video_list": [],
- "extend": build_wxindex_odps_extend(retain_reason),
- "dt": partition_dt,
- }
- def build_wxindex_word_odps_sync_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- retain_reason: str | None = None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = None
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "partition_dt": partition_dt,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "demand_type": TYPE_PHRASE,
- "record_id": wxindex_word_record_id,
- "weight": weight,
- "extend": build_wxindex_odps_extend(retain_reason),
- }
- def prepare_analysis_scores(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- ) -> tuple[list[dict[str, Any]], str | None]:
- """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
- window_scores = filter_scores_in_ymd_window(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- if len(window_scores) > max_days:
- window_scores = window_scores[-max_days:]
- if len(window_scores) < min_days:
- return window_scores, "insufficient_days"
- return window_scores, None
- def _score_row_ymd(row: dict[str, Any]) -> str:
- return str(row.get("ymd") or row.get("dt") or "").strip()
- def _window_ymd_bounds(
- window_scores: list[dict[str, Any]],
- ) -> tuple[str | None, str | None]:
- if not window_scores:
- return None, None
- start_ymd = _score_row_ymd(window_scores[0]) or None
- end_ymd = _score_row_ymd(window_scores[-1]) or None
- return start_ymd, end_ymd
- def analyze_wxindex_heat_patterns(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- ) -> dict[str, Any]:
- """对目标区间数据判断三种热度模式。"""
- window_scores, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason:
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- return {
- "skipped": True,
- "skip_reason": skip_reason,
- "data_days": len(window_scores) if window_scores else None,
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "patterns": [],
- }
- numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
- is_rising = is_wxindex_heat_rising_scores(
- numeric_scores,
- min_points=min_days,
- overall_change_rate=rising_overall_change_rate,
- window_change_rate_threshold=rising_window_change_rate,
- adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- is_spike = is_wxindex_spike_scores(
- numeric_scores,
- spike_days=spike_days,
- min_points=min_days,
- spike_ratio=spike_ratio,
- baseline_floor=spike_baseline_floor,
- )
- retain_reason = resolve_retain_reason(
- is_sustained_high=is_sustained_high,
- is_rising=is_rising,
- is_spike=is_spike,
- )
- patterns: list[str] = []
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- patterns.append(PATTERN_SUSTAINED_HIGH)
- elif retain_reason == RETAIN_REASON_RISING:
- patterns.append(PATTERN_RISING)
- elif retain_reason == RETAIN_REASON_SPIKE:
- patterns.append(PATTERN_SPIKE)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": len(window_scores),
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "fetch_start_ymd": start_ymd,
- "fetch_end_ymd": end_ymd,
- "min_score": min(numeric_scores),
- "max_score": max(numeric_scores),
- "avg_score": sum(numeric_scores) / len(numeric_scores),
- "is_sustained_high": is_sustained_high,
- "is_rising": is_rising,
- "is_spike": is_spike,
- "retain_reason": retain_reason,
- "patterns": patterns,
- "scores": window_scores,
- }
- def build_wxindex_word_record_init_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- def build_wxindex_word_record_skipped_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- result: dict[str, Any],
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析被跳过时更新追溯记录。"""
- payload = {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- payload.update(
- {
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "detail_json": {
- "phase": "analysis_skipped",
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- },
- }
- )
- return payload
- def build_wxindex_word_record_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- ) -> dict[str, Any]:
- """根据 pending item 当前状态构建 records 表 payload。"""
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- "retain_reason": retain_reason,
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": item.get("matched_demand"),
- "demand_cache_run_id": demand_cache_run_id,
- "internal_demand_match_json": item.get("internal_demand_match_json"),
- "senior_fit_score": item.get("senior_fit_score"),
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_final_retained": item.get("is_final_retained"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "phase": phase,
- "patterns": list(result.get("patterns") or []),
- "retain_reason": retain_reason,
- "retain_reason_priority": "2->3->1",
- "senior_fit_threshold": senior_threshold,
- "senior_fit_threshold_score": senior_threshold * 10.0,
- "is_final_retained": item.get("is_final_retained"),
- "sustained_threshold": sustained_threshold,
- "spike_days": spike_days,
- "spike_ratio": spike_ratio,
- "spike_baseline_floor": spike_baseline_floor,
- "rising_overall_change_rate": rising_overall_change_rate,
- "rising_window_change_rate": rising_window_change_rate,
- "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
- },
- }
- def _persist_wxindex_word_record(
- repository: HotContentRepository,
- payload: dict[str, Any],
- *,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- action: str,
- ) -> int:
- name = str(payload.get("name") or "").strip()
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would {action} wxindex word record word={name}")
- return 0
- return repository.save_wxindex_word_record(payload)
- def _persist_pending_item_record(
- repository: HotContentRepository,
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> int:
- payload = build_wxindex_word_record_payload(
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase=phase,
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- record_id = _persist_wxindex_word_record(
- repository,
- payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action=f"{phase} word={item['name']}",
- )
- if record_id:
- item["record_id"] = record_id
- return int(item.get("record_id") or 0)
- def _filter_candidates_awaiting_yesterday_score(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- yesterday_ymd: str,
- existing_records: dict[str, dict[str, Any]],
- verbose: bool,
- ) -> tuple[list[dict[str, Any]], int]:
- """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
- names_to_check = [
- item["name"]
- for item in candidate_items
- if not _is_heat_analysis_done(existing_records.get(item["name"]))
- ]
- if not names_to_check:
- return candidate_items, 0
- names_with_yesterday = repository.list_wxindex_word_names_with_dt(
- names_to_check,
- dt=yesterday_ymd,
- )
- ready_items: list[dict[str, Any]] = []
- awaiting_count = 0
- for item in candidate_items:
- name = item["name"]
- if _is_heat_analysis_done(existing_records.get(name)):
- ready_items.append(item)
- continue
- if name in names_with_yesterday:
- ready_items.append(item)
- continue
- awaiting_count += 1
- if verbose:
- print(
- f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
- )
- return ready_items, awaiting_count
- def _init_candidate_wxindex_word_records(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> None:
- if not candidate_items:
- return
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(
- f"[{label}] would batch init wxindex word records "
- f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
- )
- return
- init_payloads = [
- build_wxindex_word_record_init_payload(
- meta=item["meta"],
- name=item["name"],
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=item["fetch_start_ymd"],
- fetch_end_ymd=item["fetch_end_ymd"],
- demand_cache_run_id=demand_cache_run_id,
- )
- for item in candidate_items
- ]
- record_id_map = repository.init_wxindex_word_records(init_payloads)
- for item in candidate_items:
- item["record_id"] = int(record_id_map.get(item["name"]) or 0)
- if verbose:
- print(
- f"init wxindex word records count={len(candidate_items)} "
- f"analyze_ymd={analyze_ymd}"
- )
- def _apply_demand_match_to_item(
- item: dict[str, Any],
- match_result: dict[str, Any],
- ) -> None:
- item["internal_demand_match_json"] = match_result
- item["is_internal_demand_matched"] = bool(match_result.get("matched"))
- matched_demand = str(match_result.get("matched_demand") or "").strip()
- item["matched_demand"] = matched_demand or None
- if not item.get("is_internal_demand_matched"):
- item["is_final_retained"] = False
- def _apply_senior_fit_to_item(
- item: dict[str, Any],
- senior_result: dict[str, Any],
- ) -> None:
- passed = bool(senior_result.get("passed"))
- item["senior_fit_score"] = senior_result.get("senior_fit_score")
- item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
- item["senior_fit_passed"] = passed
- item["is_final_retained"] = passed
- def build_wxindex_word_stats_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- ) -> dict[str, Any]:
- """构建通过热度+老年性筛选的词统计 payload。"""
- meta = item["meta"]
- result = item["result"]
- return {
- "name": item["name"],
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "wxindex_word_record_id": item.get("record_id"),
- "retain_reason": item.get("retain_reason"),
- "senior_fit_score": item.get("senior_fit_score"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- },
- }
- def _save_senior_fit_passed_stats(
- repository: HotContentRepository,
- items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- existing_stats_names: set[str] | None = None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> tuple[int, int]:
- existing = existing_stats_names or set()
- all_payloads = [
- build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
- for item in items
- if item.get("senior_fit_passed")
- ]
- resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
- payloads = [payload for payload in all_payloads if payload["name"] not in existing]
- if not payloads:
- return 0, resumed
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would save wxindex word stats count={len(payloads)}")
- return len(payloads), resumed
- saved = repository.save_wxindex_word_stats_batch(payloads)
- if verbose:
- print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
- return saved, resumed
- def _build_export_row_from_item(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- strategy: str,
- ) -> dict[str, Any]:
- meta = item["meta"]
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- is_internal_demand_matched = item.get("is_internal_demand_matched")
- matched_demand = str(item.get("matched_demand") or "")
- senior_fit_score = item.get("senior_fit_score")
- is_final_retained = bool(item.get("is_final_retained"))
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "analysis_skipped": False,
- "skip_reason": "",
- "is_sustained_high": bool(result.get("is_sustained_high")),
- "is_rising": bool(result.get("is_rising")),
- "is_spike": bool(result.get("is_spike")),
- "retain_reason": retain_reason or "",
- "is_internal_demand_matched": (
- "" if is_internal_demand_matched is None else is_internal_demand_matched
- ),
- "matched_demand": matched_demand,
- "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
- "is_final_retained": is_final_retained,
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "demand_id": (
- build_demand_id(
- strategy=strategy,
- demand_name=name,
- partition_dt=analyze_ymd,
- )
- if is_final_retained and strategy
- else ""
- ),
- "weight": (
- float(result.get("max_score") or 0) / WEIGHT_DIVISOR
- if is_final_retained and result.get("max_score") is not None
- else ""
- ),
- }
- def run_wxindex_heat_pattern_daily_job(
- repository: HotContentRepository,
- *,
- config: FlowConfig | None = None,
- api_client: JsonApiClient | None = None,
- today: date | None = None,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- dry_run: bool = False,
- skip_odps: bool = False,
- skip_db_save: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- analyze_ymd = current.strftime("%Y%m%d")
- meta_rows = repository.list_active_wxindex_word_meta(today=current)
- demand_name_set: list[str] = []
- demand_cache_run_id: int | None = None
- demand_cache_error: str | None = None
- postprocess_service: ContributionPostprocessService | None = None
- if config is not None:
- try:
- cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
- demand_name_set = list(cache.get("demand_name_set") or [])
- demand_cache_run_id = int(cache["id"])
- if demand_name_set:
- postprocess_service = ContributionPostprocessService(
- config,
- repository,
- api_client
- or JsonApiClient(
- timeout_seconds=config.request_timeout_seconds,
- verify_ssl=config.https_verify_ssl,
- ),
- )
- except HotContentFlowError as exc:
- demand_cache_error = str(exc)
- if verbose:
- print(f"demand cache unavailable, skip demand match: {exc}")
- summary: dict[str, Any] = {
- "analyze_ymd": analyze_ymd,
- "meta_count": len(meta_rows),
- "analyzed": 0,
- "skipped": 0,
- "retained": 0,
- "sustained_high": 0,
- "rising": 0,
- "spike": 0,
- "internal_demand_matched": 0,
- "senior_scored": 0,
- "senior_fit_candidates": 0,
- "senior_fit_passed": 0,
- "stats_saved": 0,
- "final_retained": 0,
- "odps_synced": 0,
- "odps_written": 0,
- "demand_cache_run_id": demand_cache_run_id,
- "demand_cache_error": demand_cache_error,
- "demand_name_count": len(demand_name_set),
- "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
- "demand_match_batches": 0,
- "senior_fit_batches": 0,
- "records_initialized": 0,
- "awaiting_yesterday_score": 0,
- "heat_resumed": 0,
- "demand_match_resumed": 0,
- "senior_fit_resumed": 0,
- "stats_resumed": 0,
- "finalized_resumed": 0,
- "dry_run": dry_run,
- "skip_odps": skip_odps,
- "skip_db_save": skip_db_save,
- }
- retained_words: list[dict[str, Any]] = []
- export_rows: list[dict[str, Any]] = []
- final_hive_rows: list[dict[str, Any]] = []
- senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
- strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
- pending_items: list[dict[str, Any]] = []
- candidate_items: list[dict[str, Any]] = []
- for meta in meta_rows:
- name = str(meta.get("name") or "").strip()
- fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
- fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
- if not name or not fetch_start_ymd or not fetch_end_ymd:
- summary["skipped"] += 1
- continue
- candidate_items.append(
- {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- }
- )
- _init_candidate_wxindex_word_records(
- repository,
- candidate_items,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["records_initialized"] = len(candidate_items)
- existing_records: dict[str, dict[str, Any]] = {}
- existing_stats_names: set[str] = set()
- if not dry_run and not skip_db_save:
- candidate_names = [item["name"] for item in candidate_items]
- existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- existing_stats_names = repository.list_wxindex_word_stats_names(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
- candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
- repository,
- candidate_items,
- yesterday_ymd=yesterday_ymd,
- existing_records=existing_records,
- verbose=verbose,
- )
- summary["awaiting_yesterday_score"] = awaiting_yesterday
- for item in candidate_items:
- if not item.get("record_id"):
- existing = existing_records.get(item["name"])
- if existing:
- item["record_id"] = int(existing.get("id") or 0)
- for item in candidate_items:
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- existing_record = existing_records.get(name)
- scores = repository.list_wxindex_word_scores_in_range(
- name,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- if (
- _is_heat_analysis_done(existing_record)
- and not _should_rerun_heat_analysis(
- existing_record,
- scores=scores,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- ):
- summary["heat_resumed"] += 1
- if _is_analysis_skipped_record(existing_record):
- if verbose:
- detail = _parse_record_detail_json(existing_record)
- print(
- f"resume skip word={name} reason={detail.get('skip_reason')} "
- f"days={existing_record.get('data_days')}"
- )
- export_rows.append(
- _build_skipped_export_row_from_record(
- record=existing_record,
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- )
- )
- continue
- pending_item = _rehydrate_pending_item_from_record(
- item,
- existing_record,
- senior_threshold=senior_threshold,
- )
- pending_items.append(pending_item)
- if verbose:
- print(
- f"resume heat analyzed word={name} "
- f"retain_reason={pending_item.get('retain_reason') or ''} "
- f"data_days={pending_item['result'].get('data_days')}"
- )
- continue
- result = analyze_wxindex_heat_patterns(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- sustained_threshold=sustained_threshold,
- min_days=min_days,
- max_days=max_days,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- if result.get("skipped"):
- if verbose:
- print(
- f"skip word={name} reason={result.get('skip_reason')} "
- f"days={result.get('data_days')}"
- )
- skipped_payload = build_wxindex_word_record_skipped_payload(
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- result=result,
- demand_cache_run_id=demand_cache_run_id,
- )
- _persist_wxindex_word_record(
- repository,
- skipped_payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action="update skipped",
- )
- export_rows.append(
- {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- )
- continue
- retain_reason = str(result.get("retain_reason") or "").strip() or None
- pending_item = {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "result": result,
- "retain_reason": retain_reason,
- "record_id": item.get("record_id", 0),
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "senior_fit_passed": None,
- "is_final_retained": None,
- }
- pending_items.append(pending_item)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="heat_analyzed",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose:
- print(
- f"heat analyzed word={name} retain_reason={retain_reason or ''} "
- f"data_days={result.get('data_days')}"
- )
- demand_match_results: dict[str, dict[str, Any]] = {}
- pending_by_name = {item["name"]: item for item in pending_items}
- sustained_high_words = [
- item["name"]
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- and item.get("is_internal_demand_matched") is None
- ]
- for item in pending_items:
- if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
- continue
- if item.get("is_internal_demand_matched") is not None:
- summary["demand_match_resumed"] += 1
- def _persist_demand_match_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- if pending_item is None:
- continue
- match_result = demand_match_results.get(word) or {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- }
- _apply_demand_match_to_item(pending_item, match_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="demand_matched",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if sustained_high_words:
- if postprocess_service is not None and demand_name_set:
- for batch in _chunk_words(sustained_high_words):
- summary["demand_match_batches"] += 1
- if verbose:
- print(f"demand match batch size={len(batch)} words={batch}")
- demand_match_results.update(
- postprocess_service.match_words_to_demand_pool(
- words=batch,
- demand_name_set=demand_name_set,
- )
- )
- _persist_demand_match_updates(batch)
- else:
- for word in sustained_high_words:
- demand_match_results[word] = {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- "skip_reason": "empty_demand_cache",
- }
- _persist_demand_match_updates(sustained_high_words)
- senior_fit_candidate_names: list[str] = []
- for item in pending_items:
- retain_reason = item.get("retain_reason")
- name = str(item.get("name") or "").strip()
- if not retain_reason or not name:
- continue
- if _is_senior_fit_attempt_done(item, existing_records.get(name)):
- summary["senior_fit_resumed"] += 1
- continue
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- if not item.get("is_internal_demand_matched"):
- continue
- senior_fit_candidate_names.append(name)
- senior_fit_results: dict[str, dict[str, Any]] = {}
- def _persist_senior_fit_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- senior_result = senior_fit_results.get(word)
- if pending_item is None or senior_result is None:
- continue
- _apply_senior_fit_to_item(pending_item, senior_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="senior_fit_scored",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose and not pending_item.get("senior_fit_passed"):
- print(
- f"senior fit rejected word={word} "
- f"score={pending_item.get('senior_fit_score')}"
- )
- if config is not None and senior_fit_candidate_names:
- for batch in _chunk_words(senior_fit_candidate_names):
- summary["senior_fit_batches"] += 1
- if verbose:
- print(f"senior fit batch size={len(batch)} words={batch}")
- senior_fit_results.update(
- score_wxindex_words_senior_fit(
- words=batch,
- config=config,
- senior_threshold=senior_threshold,
- batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
- )
- )
- _persist_senior_fit_updates(batch)
- stats_saved, stats_resumed = _save_senior_fit_passed_stats(
- repository,
- pending_items,
- analyze_ymd=analyze_ymd,
- existing_stats_names=existing_stats_names,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["stats_saved"] = stats_saved
- summary["stats_resumed"] = stats_resumed
- for item in pending_items:
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- if retain_reason:
- retained_words.append(
- {
- "name": name,
- "retain_reason": retain_reason,
- "data_days": result.get("data_days"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": str(item.get("matched_demand") or ""),
- "senior_fit_score": item.get("senior_fit_score"),
- "is_final_retained": bool(item.get("is_final_retained")),
- }
- )
- existing_record = existing_records.get(name)
- if _is_finalized_record(existing_record):
- summary["finalized_resumed"] += 1
- record_id = int(item.get("record_id") or existing_record.get("id") or 0)
- if verbose:
- print(f"resume finalized word={name}")
- else:
- record_id = _persist_pending_item_record(
- repository,
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="finalized",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- export_rows.append(
- _build_export_row_from_item(
- item,
- analyze_ymd=analyze_ymd,
- strategy=strategy,
- )
- )
- if item.get("is_final_retained") and strategy:
- final_hive_rows.append(
- build_wxindex_word_hive_row(
- wxindex_word_record_id=record_id,
- word=name,
- strategy=strategy,
- partition_dt=analyze_ymd,
- max_score=result.get("max_score"),
- retain_reason=item.get("retain_reason"),
- )
- )
- if (
- final_hive_rows
- and strategy
- and config is not None
- and not dry_run
- and not skip_odps
- ):
- odps_summary = sync_wxindex_word_rows_to_odps(
- config,
- repository,
- hive_rows=final_hive_rows,
- partition_dt=analyze_ymd,
- strategy=strategy,
- )
- summary["odps_written"] = odps_summary.get("written_count", 0)
- summary["odps_synced"] = odps_summary.get("odps_synced", 0)
- summary["odps_sync"] = odps_summary
- elif dry_run or skip_odps:
- summary["odps_written"] = len(final_hive_rows)
- summary["odps_synced"] = len(final_hive_rows)
- _refresh_wxindex_heat_job_summary(
- summary,
- pending_items=pending_items,
- export_rows=export_rows,
- )
- summary["retained_words"] = retained_words
- summary["export_rows"] = export_rows
- return summary
- WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
- "analyze_ymd",
- "name",
- "meta_id",
- "fetch_start_ymd",
- "fetch_end_ymd",
- "data_start_ymd",
- "data_end_ymd",
- "data_days",
- "analysis_skipped",
- "skip_reason",
- "is_sustained_high",
- "is_rising",
- "is_spike",
- "retain_reason",
- "is_internal_demand_matched",
- "matched_demand",
- "senior_fit_score",
- "is_final_retained",
- "min_score",
- "max_score",
- "avg_score",
- "demand_id",
- "weight",
- ]
- def write_wxindex_heat_pattern_csv(
- rows: list[dict[str, Any]],
- output_path: str | Path,
- *,
- fieldnames: list[str] | None = None,
- ) -> Path:
- """将热度分析明细写入本地 CSV。"""
- path = Path(output_path).expanduser()
- if not path.is_absolute():
- path = Path.cwd() / path
- path.parent.mkdir(parents=True, exist_ok=True)
- columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
- with path.open("w", encoding="utf-8-sig", newline="") as handle:
- writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
- writer.writeheader()
- for row in rows:
- writer.writerow({column: row.get(column, "") for column in columns})
- return path
|