| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654 |
- """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
- from __future__ import annotations
- import csv
- import json
- from datetime import date, datetime, timedelta
- from pathlib import Path
- from typing import Any
- from app.hot_content.client import JsonApiClient
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
- from app.hot_content.demand_quality import (
- TYPE_PHRASE,
- llm_score_senior_fit,
- lookup_quality_scores,
- )
- from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.postprocess_service import ContributionPostprocessService
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.wxindex_trend import (
- HEAT_RISING_ADJACENT_UP_RATIO,
- HEAT_RISING_OVERALL_CHANGE_RATE,
- HEAT_RISING_WINDOW_CHANGE_RATE,
- HEAT_SPIKE_BASELINE_FLOOR,
- HEAT_SPIKE_RATIO,
- extract_sorted_scores,
- is_wxindex_heat_rising_scores,
- is_wxindex_spike_scores,
- )
- from app.hot_content.wxindex_words import filter_scores_in_ymd_window
- WXINDEX_HEAT_MIN_DAYS = 7
- WXINDEX_HEAT_MAX_DAYS = 14
- WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
- WXINDEX_SPIKE_LOOKBACK_DAYS = 3
- WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
- WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
- WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
- WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
- WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
- WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
- WXINDEX_WORD_LLM_BATCH_SIZE = 10
- PATTERN_SUSTAINED_HIGH = "sustained_high"
- PATTERN_RISING = "rising"
- PATTERN_SPIKE = "spike"
- RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
- RETAIN_REASON_RISING = "热度持续上涨"
- RETAIN_REASON_SPIKE = "热度突然暴涨"
- PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
- PHASE_HEAT_ANALYZED = "heat_analyzed"
- PHASE_DEMAND_MATCHED = "demand_matched"
- PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
- PHASE_FINALIZED = "finalized"
- _HEAT_DONE_PHASES = frozenset(
- {
- PHASE_ANALYSIS_SKIPPED,
- PHASE_HEAT_ANALYZED,
- PHASE_DEMAND_MATCHED,
- PHASE_SENIOR_FIT_SCORED,
- PHASE_FINALIZED,
- }
- )
- _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
- "data_start_ymd": None,
- "data_end_ymd": None,
- "data_days": None,
- "is_sustained_high": None,
- "is_rising": None,
- "is_spike": None,
- "retain_reason": None,
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "is_final_retained": None,
- "min_score": None,
- "max_score": None,
- "avg_score": None,
- "detail_json": None,
- }
- def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
- detail = record.get("detail_json")
- if isinstance(detail, dict):
- return detail
- if isinstance(detail, str) and detail.strip():
- try:
- parsed = json.loads(detail)
- return parsed if isinstance(parsed, dict) else {}
- except json.JSONDecodeError:
- return {}
- return {}
- def _record_phase(record: dict[str, Any]) -> str:
- return str(_parse_record_detail_json(record).get("phase") or "").strip()
- def _nullable_bool_from_record(value: Any) -> bool | None:
- if value is None:
- return None
- return bool(value)
- def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
- value = record.get(field)
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- try:
- return json.loads(value)
- except json.JSONDecodeError:
- return None
- return value
- def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) in _HEAT_DONE_PHASES
- def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
- return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
- def _is_senior_fit_needed(
- *,
- retain_reason: str | None,
- is_internal_demand_matched: bool | None,
- ) -> bool:
- if not retain_reason:
- return False
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- return bool(is_internal_demand_matched)
- return True
- def _is_finalized_record(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) == PHASE_FINALIZED
- def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
- ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
- return max(ymds) if ymds else None
- def _should_rerun_heat_analysis(
- existing_record: dict[str, Any] | None,
- *,
- scores: list[dict[str, Any]],
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- min_days: int,
- max_days: int,
- ) -> bool:
- if not _is_heat_analysis_done(existing_record):
- return True
- assert existing_record is not None
- latest_ymd = _latest_score_ymd(scores)
- record_end = str(existing_record.get("data_end_ymd") or "").strip()
- if latest_ymd and record_end and latest_ymd > record_end:
- return True
- if _is_analysis_skipped_record(existing_record):
- _, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason is None:
- return True
- return False
- def _is_senior_fit_attempt_done(
- item: dict[str, Any],
- existing_record: dict[str, Any] | None,
- ) -> bool:
- if existing_record:
- phase = _record_phase(existing_record)
- if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
- return True
- return item.get("senior_fit_score") is not None
- def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
- if score is None:
- return False
- try:
- return float(score) / 10.0 > senior_threshold
- except (TypeError, ValueError):
- return False
- def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- retain_reason_raw = record.get("retain_reason")
- retain_reason = (
- str(retain_reason_raw).strip() if retain_reason_raw is not None else None
- ) or None
- def _bool_field(key: str) -> bool | None:
- value = record.get(key)
- if value is None:
- return None
- return bool(value)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": record.get("data_days"),
- "data_start_ymd": record.get("data_start_ymd"),
- "data_end_ymd": record.get("data_end_ymd"),
- "fetch_start_ymd": record.get("fetch_start_ymd"),
- "fetch_end_ymd": record.get("fetch_end_ymd"),
- "min_score": record.get("min_score"),
- "max_score": record.get("max_score"),
- "avg_score": record.get("avg_score"),
- "is_sustained_high": _bool_field("is_sustained_high"),
- "is_rising": _bool_field("is_rising"),
- "is_spike": _bool_field("is_spike"),
- "retain_reason": retain_reason,
- "patterns": list(detail.get("patterns") or []),
- }
- def _rehydrate_pending_item_from_record(
- item: dict[str, Any],
- record: dict[str, Any],
- *,
- senior_threshold: float,
- ) -> dict[str, Any]:
- result = _rehydrate_result_from_record(record)
- retain_reason = result.get("retain_reason")
- is_internal_demand_matched = _nullable_bool_from_record(
- record.get("is_internal_demand_matched")
- )
- matched_demand_raw = record.get("matched_demand")
- matched_demand = (
- str(matched_demand_raw).strip() if matched_demand_raw is not None else None
- ) or None
- senior_fit_score = record.get("senior_fit_score")
- senior_fit_passed: bool | None = None
- is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
- if senior_fit_score is not None:
- senior_fit_passed = _senior_fit_passed_from_score(
- senior_fit_score,
- senior_threshold=senior_threshold,
- )
- elif not _is_senior_fit_needed(
- retain_reason=retain_reason,
- is_internal_demand_matched=is_internal_demand_matched,
- ):
- senior_fit_passed = False
- if is_final_retained is None:
- is_final_retained = False
- return {
- "meta": item["meta"],
- "name": item["name"],
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "result": result,
- "retain_reason": retain_reason,
- "record_id": int(record.get("id") or item.get("record_id") or 0),
- "is_internal_demand_matched": is_internal_demand_matched,
- "matched_demand": matched_demand,
- "internal_demand_match_json": _parse_record_json_field(
- record,
- "internal_demand_match_json",
- ),
- "senior_fit_score": senior_fit_score,
- "demand_senior_fit_json": _parse_record_json_field(
- record,
- "demand_senior_fit_json",
- ),
- "senior_fit_passed": senior_fit_passed,
- "is_final_retained": is_final_retained,
- }
- def _build_skipped_export_row_from_record(
- *,
- record: dict[str, Any],
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- ) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": detail.get("skip_reason") or "",
- "data_days": record.get("data_days") or "",
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- def _refresh_wxindex_heat_job_summary(
- summary: dict[str, Any],
- *,
- pending_items: list[dict[str, Any]],
- export_rows: list[dict[str, Any]],
- ) -> None:
- summary["analyzed"] = len(pending_items)
- summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
- summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
- summary["sustained_high"] = sum(
- 1
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- )
- summary["rising"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
- )
- summary["spike"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
- )
- summary["internal_demand_matched"] = sum(
- 1 for item in pending_items if item.get("is_internal_demand_matched")
- )
- summary["senior_fit_candidates"] = sum(
- 1
- for item in pending_items
- if _is_senior_fit_needed(
- retain_reason=item.get("retain_reason"),
- is_internal_demand_matched=item.get("is_internal_demand_matched"),
- )
- )
- summary["senior_scored"] = sum(
- 1 for item in pending_items if item.get("senior_fit_score") is not None
- )
- summary["senior_fit_passed"] = sum(
- 1 for item in pending_items if item.get("senior_fit_passed")
- )
- summary["final_retained"] = sum(
- 1 for item in pending_items if item.get("is_final_retained")
- )
- def resolve_retain_reason(
- *,
- is_sustained_high: bool,
- is_rising: bool,
- is_spike: bool,
- ) -> str | None:
- """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
- if is_rising:
- return RETAIN_REASON_RISING
- if is_spike:
- return RETAIN_REASON_SPIKE
- if is_sustained_high:
- return RETAIN_REASON_SUSTAINED_HIGH
- return None
- def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
- size = max(batch_size, 1)
- return [words[index : index + size] for index in range(0, len(words), size)]
- def _empty_senior_fit_result() -> dict[str, Any]:
- return {
- "senior_fit_score": None,
- "demand_senior_fit_json": {"source": "", "items": []},
- "passed": False,
- }
- def score_wxindex_words_senior_fit(
- *,
- words: list[str],
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
- ) -> dict[str, dict[str, Any]]:
- """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
- cleaned: list[str] = []
- seen: set[str] = set()
- for raw in words:
- word = str(raw or "").strip()
- if not word or word in seen:
- continue
- seen.add(word)
- cleaned.append(word)
- results: dict[str, dict[str, Any]] = {
- word: _empty_senior_fit_result() for word in cleaned
- }
- for batch in _chunk_words(cleaned, batch_size=batch_size):
- candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
- senior_fit_json = llm_score_senior_fit(
- channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
- candidates=candidates,
- model=config.demand_quality_llm_model,
- max_attempts=config.demand_quality_llm_max_attempts,
- retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
- max_tokens=config.demand_quality_llm_max_tokens,
- )
- senior_fit_json["threshold"] = senior_threshold * 10.0
- for word in batch:
- _, senior_score = lookup_quality_scores(
- demand_type=TYPE_PHRASE,
- demand_text=word,
- event_sense_json=None,
- senior_fit_json=senior_fit_json,
- )
- passed = (
- senior_score is not None
- and senior_score / 10.0 > senior_threshold
- )
- results[word] = {
- "senior_fit_score": senior_score,
- "demand_senior_fit_json": senior_fit_json,
- "passed": passed,
- }
- return results
- def score_wxindex_word_senior_fit(
- *,
- word: str,
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- ) -> dict[str, Any]:
- """对单个微信指数热词执行老年性 LLM 评分。"""
- target_word = str(word or "").strip()
- if not target_word:
- return _empty_senior_fit_result()
- batch_results = score_wxindex_words_senior_fit(
- words=[target_word],
- config=config,
- senior_threshold=senior_threshold,
- batch_size=1,
- )
- return batch_results.get(target_word, _empty_senior_fit_result())
- def build_wxindex_word_hive_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = 0.0
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "record_id": wxindex_word_record_id,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "weight": weight,
- "type": TYPE_PHRASE,
- "video_count": None,
- "video_list": [],
- "extend": "{}",
- "dt": partition_dt,
- }
- def build_wxindex_word_odps_sync_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = None
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "partition_dt": partition_dt,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "demand_type": TYPE_PHRASE,
- "record_id": wxindex_word_record_id,
- "weight": weight,
- }
- def prepare_analysis_scores(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- ) -> tuple[list[dict[str, Any]], str | None]:
- """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
- window_scores = filter_scores_in_ymd_window(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- if len(window_scores) > max_days:
- window_scores = window_scores[-max_days:]
- if len(window_scores) < min_days:
- return window_scores, "insufficient_days"
- return window_scores, None
- def _score_row_ymd(row: dict[str, Any]) -> str:
- return str(row.get("ymd") or row.get("dt") or "").strip()
- def _window_ymd_bounds(
- window_scores: list[dict[str, Any]],
- ) -> tuple[str | None, str | None]:
- if not window_scores:
- return None, None
- start_ymd = _score_row_ymd(window_scores[0]) or None
- end_ymd = _score_row_ymd(window_scores[-1]) or None
- return start_ymd, end_ymd
- def analyze_wxindex_heat_patterns(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- ) -> dict[str, Any]:
- """对目标区间数据判断三种热度模式。"""
- window_scores, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason:
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- return {
- "skipped": True,
- "skip_reason": skip_reason,
- "data_days": len(window_scores) if window_scores else None,
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "patterns": [],
- }
- numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
- is_rising = is_wxindex_heat_rising_scores(
- numeric_scores,
- min_points=min_days,
- overall_change_rate=rising_overall_change_rate,
- window_change_rate_threshold=rising_window_change_rate,
- adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- is_spike = is_wxindex_spike_scores(
- numeric_scores,
- spike_days=spike_days,
- min_points=min_days,
- spike_ratio=spike_ratio,
- baseline_floor=spike_baseline_floor,
- )
- retain_reason = resolve_retain_reason(
- is_sustained_high=is_sustained_high,
- is_rising=is_rising,
- is_spike=is_spike,
- )
- patterns: list[str] = []
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- patterns.append(PATTERN_SUSTAINED_HIGH)
- elif retain_reason == RETAIN_REASON_RISING:
- patterns.append(PATTERN_RISING)
- elif retain_reason == RETAIN_REASON_SPIKE:
- patterns.append(PATTERN_SPIKE)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": len(window_scores),
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "fetch_start_ymd": start_ymd,
- "fetch_end_ymd": end_ymd,
- "min_score": min(numeric_scores),
- "max_score": max(numeric_scores),
- "avg_score": sum(numeric_scores) / len(numeric_scores),
- "is_sustained_high": is_sustained_high,
- "is_rising": is_rising,
- "is_spike": is_spike,
- "retain_reason": retain_reason,
- "patterns": patterns,
- "scores": window_scores,
- }
- def build_wxindex_word_record_init_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- def build_wxindex_word_record_skipped_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- result: dict[str, Any],
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析被跳过时更新追溯记录。"""
- payload = {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- payload.update(
- {
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "detail_json": {
- "phase": "analysis_skipped",
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- },
- }
- )
- return payload
- def build_wxindex_word_record_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- ) -> dict[str, Any]:
- """根据 pending item 当前状态构建 records 表 payload。"""
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- "retain_reason": retain_reason,
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": item.get("matched_demand"),
- "demand_cache_run_id": demand_cache_run_id,
- "internal_demand_match_json": item.get("internal_demand_match_json"),
- "senior_fit_score": item.get("senior_fit_score"),
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_final_retained": item.get("is_final_retained"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "phase": phase,
- "patterns": list(result.get("patterns") or []),
- "retain_reason": retain_reason,
- "retain_reason_priority": "2->3->1",
- "senior_fit_threshold": senior_threshold,
- "senior_fit_threshold_score": senior_threshold * 10.0,
- "is_final_retained": item.get("is_final_retained"),
- "sustained_threshold": sustained_threshold,
- "spike_days": spike_days,
- "spike_ratio": spike_ratio,
- "spike_baseline_floor": spike_baseline_floor,
- "rising_overall_change_rate": rising_overall_change_rate,
- "rising_window_change_rate": rising_window_change_rate,
- "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
- },
- }
- def _persist_wxindex_word_record(
- repository: HotContentRepository,
- payload: dict[str, Any],
- *,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- action: str,
- ) -> int:
- name = str(payload.get("name") or "").strip()
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would {action} wxindex word record word={name}")
- return 0
- return repository.save_wxindex_word_record(payload)
- def _persist_pending_item_record(
- repository: HotContentRepository,
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> int:
- payload = build_wxindex_word_record_payload(
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase=phase,
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- record_id = _persist_wxindex_word_record(
- repository,
- payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action=f"{phase} word={item['name']}",
- )
- if record_id:
- item["record_id"] = record_id
- return int(item.get("record_id") or 0)
- def _filter_candidates_awaiting_yesterday_score(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- yesterday_ymd: str,
- existing_records: dict[str, dict[str, Any]],
- verbose: bool,
- ) -> tuple[list[dict[str, Any]], int]:
- """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
- names_to_check = [
- item["name"]
- for item in candidate_items
- if not _is_heat_analysis_done(existing_records.get(item["name"]))
- ]
- if not names_to_check:
- return candidate_items, 0
- names_with_yesterday = repository.list_wxindex_word_names_with_dt(
- names_to_check,
- dt=yesterday_ymd,
- )
- ready_items: list[dict[str, Any]] = []
- awaiting_count = 0
- for item in candidate_items:
- name = item["name"]
- if _is_heat_analysis_done(existing_records.get(name)):
- ready_items.append(item)
- continue
- if name in names_with_yesterday:
- ready_items.append(item)
- continue
- awaiting_count += 1
- if verbose:
- print(
- f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
- )
- return ready_items, awaiting_count
- def _init_candidate_wxindex_word_records(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> None:
- if not candidate_items:
- return
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(
- f"[{label}] would batch init wxindex word records "
- f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
- )
- return
- init_payloads = [
- build_wxindex_word_record_init_payload(
- meta=item["meta"],
- name=item["name"],
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=item["fetch_start_ymd"],
- fetch_end_ymd=item["fetch_end_ymd"],
- demand_cache_run_id=demand_cache_run_id,
- )
- for item in candidate_items
- ]
- record_id_map = repository.init_wxindex_word_records(init_payloads)
- for item in candidate_items:
- item["record_id"] = int(record_id_map.get(item["name"]) or 0)
- if verbose:
- print(
- f"init wxindex word records count={len(candidate_items)} "
- f"analyze_ymd={analyze_ymd}"
- )
- def _apply_demand_match_to_item(
- item: dict[str, Any],
- match_result: dict[str, Any],
- ) -> None:
- item["internal_demand_match_json"] = match_result
- item["is_internal_demand_matched"] = bool(match_result.get("matched"))
- matched_demand = str(match_result.get("matched_demand") or "").strip()
- item["matched_demand"] = matched_demand or None
- if not item.get("is_internal_demand_matched"):
- item["is_final_retained"] = False
- def _apply_senior_fit_to_item(
- item: dict[str, Any],
- senior_result: dict[str, Any],
- ) -> None:
- passed = bool(senior_result.get("passed"))
- item["senior_fit_score"] = senior_result.get("senior_fit_score")
- item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
- item["senior_fit_passed"] = passed
- item["is_final_retained"] = passed
- def build_wxindex_word_stats_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- ) -> dict[str, Any]:
- """构建通过热度+老年性筛选的词统计 payload。"""
- meta = item["meta"]
- result = item["result"]
- return {
- "name": item["name"],
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "wxindex_word_record_id": item.get("record_id"),
- "retain_reason": item.get("retain_reason"),
- "senior_fit_score": item.get("senior_fit_score"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- },
- }
- def _save_senior_fit_passed_stats(
- repository: HotContentRepository,
- items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- existing_stats_names: set[str] | None = None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> tuple[int, int]:
- existing = existing_stats_names or set()
- all_payloads = [
- build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
- for item in items
- if item.get("senior_fit_passed")
- ]
- resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
- payloads = [payload for payload in all_payloads if payload["name"] not in existing]
- if not payloads:
- return 0, resumed
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would save wxindex word stats count={len(payloads)}")
- return len(payloads), resumed
- saved = repository.save_wxindex_word_stats_batch(payloads)
- if verbose:
- print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
- return saved, resumed
- def _build_export_row_from_item(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- strategy: str,
- ) -> dict[str, Any]:
- meta = item["meta"]
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- is_internal_demand_matched = item.get("is_internal_demand_matched")
- matched_demand = str(item.get("matched_demand") or "")
- senior_fit_score = item.get("senior_fit_score")
- is_final_retained = bool(item.get("is_final_retained"))
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "analysis_skipped": False,
- "skip_reason": "",
- "is_sustained_high": bool(result.get("is_sustained_high")),
- "is_rising": bool(result.get("is_rising")),
- "is_spike": bool(result.get("is_spike")),
- "retain_reason": retain_reason or "",
- "is_internal_demand_matched": (
- "" if is_internal_demand_matched is None else is_internal_demand_matched
- ),
- "matched_demand": matched_demand,
- "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
- "is_final_retained": is_final_retained,
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "demand_id": (
- build_demand_id(
- strategy=strategy,
- demand_name=name,
- partition_dt=analyze_ymd,
- )
- if is_final_retained and strategy
- else ""
- ),
- "weight": (
- float(result.get("max_score") or 0) / WEIGHT_DIVISOR
- if is_final_retained and result.get("max_score") is not None
- else ""
- ),
- }
- def run_wxindex_heat_pattern_daily_job(
- repository: HotContentRepository,
- *,
- config: FlowConfig | None = None,
- api_client: JsonApiClient | None = None,
- today: date | None = None,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- dry_run: bool = False,
- skip_odps: bool = False,
- skip_db_save: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- analyze_ymd = current.strftime("%Y%m%d")
- meta_rows = repository.list_active_wxindex_word_meta(today=current)
- demand_name_set: list[str] = []
- demand_cache_run_id: int | None = None
- demand_cache_error: str | None = None
- postprocess_service: ContributionPostprocessService | None = None
- if config is not None:
- try:
- cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
- demand_name_set = list(cache.get("demand_name_set") or [])
- demand_cache_run_id = int(cache["id"])
- if demand_name_set:
- postprocess_service = ContributionPostprocessService(
- config,
- repository,
- api_client
- or JsonApiClient(
- timeout_seconds=config.request_timeout_seconds,
- verify_ssl=config.https_verify_ssl,
- ),
- )
- except HotContentFlowError as exc:
- demand_cache_error = str(exc)
- if verbose:
- print(f"demand cache unavailable, skip demand match: {exc}")
- summary: dict[str, Any] = {
- "analyze_ymd": analyze_ymd,
- "meta_count": len(meta_rows),
- "analyzed": 0,
- "skipped": 0,
- "retained": 0,
- "sustained_high": 0,
- "rising": 0,
- "spike": 0,
- "internal_demand_matched": 0,
- "senior_scored": 0,
- "senior_fit_candidates": 0,
- "senior_fit_passed": 0,
- "stats_saved": 0,
- "final_retained": 0,
- "odps_synced": 0,
- "odps_written": 0,
- "demand_cache_run_id": demand_cache_run_id,
- "demand_cache_error": demand_cache_error,
- "demand_name_count": len(demand_name_set),
- "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
- "demand_match_batches": 0,
- "senior_fit_batches": 0,
- "records_initialized": 0,
- "awaiting_yesterday_score": 0,
- "heat_resumed": 0,
- "demand_match_resumed": 0,
- "senior_fit_resumed": 0,
- "stats_resumed": 0,
- "finalized_resumed": 0,
- "dry_run": dry_run,
- "skip_odps": skip_odps,
- "skip_db_save": skip_db_save,
- }
- retained_words: list[dict[str, Any]] = []
- export_rows: list[dict[str, Any]] = []
- final_hive_rows: list[dict[str, Any]] = []
- senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
- strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
- pending_items: list[dict[str, Any]] = []
- candidate_items: list[dict[str, Any]] = []
- for meta in meta_rows:
- name = str(meta.get("name") or "").strip()
- fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
- fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
- if not name or not fetch_start_ymd or not fetch_end_ymd:
- summary["skipped"] += 1
- continue
- candidate_items.append(
- {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- }
- )
- _init_candidate_wxindex_word_records(
- repository,
- candidate_items,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["records_initialized"] = len(candidate_items)
- existing_records: dict[str, dict[str, Any]] = {}
- existing_stats_names: set[str] = set()
- if not dry_run and not skip_db_save:
- candidate_names = [item["name"] for item in candidate_items]
- existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- existing_stats_names = repository.list_wxindex_word_stats_names(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
- candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
- repository,
- candidate_items,
- yesterday_ymd=yesterday_ymd,
- existing_records=existing_records,
- verbose=verbose,
- )
- summary["awaiting_yesterday_score"] = awaiting_yesterday
- for item in candidate_items:
- if not item.get("record_id"):
- existing = existing_records.get(item["name"])
- if existing:
- item["record_id"] = int(existing.get("id") or 0)
- for item in candidate_items:
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- existing_record = existing_records.get(name)
- scores = repository.list_wxindex_word_scores_in_range(
- name,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- if (
- _is_heat_analysis_done(existing_record)
- and not _should_rerun_heat_analysis(
- existing_record,
- scores=scores,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- ):
- summary["heat_resumed"] += 1
- if _is_analysis_skipped_record(existing_record):
- if verbose:
- detail = _parse_record_detail_json(existing_record)
- print(
- f"resume skip word={name} reason={detail.get('skip_reason')} "
- f"days={existing_record.get('data_days')}"
- )
- export_rows.append(
- _build_skipped_export_row_from_record(
- record=existing_record,
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- )
- )
- continue
- pending_item = _rehydrate_pending_item_from_record(
- item,
- existing_record,
- senior_threshold=senior_threshold,
- )
- pending_items.append(pending_item)
- if verbose:
- print(
- f"resume heat analyzed word={name} "
- f"retain_reason={pending_item.get('retain_reason') or ''} "
- f"data_days={pending_item['result'].get('data_days')}"
- )
- continue
- result = analyze_wxindex_heat_patterns(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- sustained_threshold=sustained_threshold,
- min_days=min_days,
- max_days=max_days,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- if result.get("skipped"):
- if verbose:
- print(
- f"skip word={name} reason={result.get('skip_reason')} "
- f"days={result.get('data_days')}"
- )
- skipped_payload = build_wxindex_word_record_skipped_payload(
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- result=result,
- demand_cache_run_id=demand_cache_run_id,
- )
- _persist_wxindex_word_record(
- repository,
- skipped_payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action="update skipped",
- )
- export_rows.append(
- {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- )
- continue
- retain_reason = str(result.get("retain_reason") or "").strip() or None
- pending_item = {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "result": result,
- "retain_reason": retain_reason,
- "record_id": item.get("record_id", 0),
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "senior_fit_passed": None,
- "is_final_retained": None,
- }
- pending_items.append(pending_item)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="heat_analyzed",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose:
- print(
- f"heat analyzed word={name} retain_reason={retain_reason or ''} "
- f"data_days={result.get('data_days')}"
- )
- demand_match_results: dict[str, dict[str, Any]] = {}
- pending_by_name = {item["name"]: item for item in pending_items}
- sustained_high_words = [
- item["name"]
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- and item.get("is_internal_demand_matched") is None
- ]
- for item in pending_items:
- if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
- continue
- if item.get("is_internal_demand_matched") is not None:
- summary["demand_match_resumed"] += 1
- def _persist_demand_match_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- if pending_item is None:
- continue
- match_result = demand_match_results.get(word) or {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- }
- _apply_demand_match_to_item(pending_item, match_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="demand_matched",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if sustained_high_words:
- if postprocess_service is not None and demand_name_set:
- for batch in _chunk_words(sustained_high_words):
- summary["demand_match_batches"] += 1
- if verbose:
- print(f"demand match batch size={len(batch)} words={batch}")
- demand_match_results.update(
- postprocess_service.match_words_to_demand_pool(
- words=batch,
- demand_name_set=demand_name_set,
- )
- )
- _persist_demand_match_updates(batch)
- else:
- for word in sustained_high_words:
- demand_match_results[word] = {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- "skip_reason": "empty_demand_cache",
- }
- _persist_demand_match_updates(sustained_high_words)
- senior_fit_candidate_names: list[str] = []
- for item in pending_items:
- retain_reason = item.get("retain_reason")
- name = str(item.get("name") or "").strip()
- if not retain_reason or not name:
- continue
- if _is_senior_fit_attempt_done(item, existing_records.get(name)):
- summary["senior_fit_resumed"] += 1
- continue
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- if not item.get("is_internal_demand_matched"):
- continue
- senior_fit_candidate_names.append(name)
- senior_fit_results: dict[str, dict[str, Any]] = {}
- def _persist_senior_fit_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- senior_result = senior_fit_results.get(word)
- if pending_item is None or senior_result is None:
- continue
- _apply_senior_fit_to_item(pending_item, senior_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="senior_fit_scored",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose and not pending_item.get("senior_fit_passed"):
- print(
- f"senior fit rejected word={word} "
- f"score={pending_item.get('senior_fit_score')}"
- )
- if config is not None and senior_fit_candidate_names:
- for batch in _chunk_words(senior_fit_candidate_names):
- summary["senior_fit_batches"] += 1
- if verbose:
- print(f"senior fit batch size={len(batch)} words={batch}")
- senior_fit_results.update(
- score_wxindex_words_senior_fit(
- words=batch,
- config=config,
- senior_threshold=senior_threshold,
- batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
- )
- )
- _persist_senior_fit_updates(batch)
- stats_saved, stats_resumed = _save_senior_fit_passed_stats(
- repository,
- pending_items,
- analyze_ymd=analyze_ymd,
- existing_stats_names=existing_stats_names,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["stats_saved"] = stats_saved
- summary["stats_resumed"] = stats_resumed
- for item in pending_items:
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- if retain_reason:
- retained_words.append(
- {
- "name": name,
- "retain_reason": retain_reason,
- "data_days": result.get("data_days"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": str(item.get("matched_demand") or ""),
- "senior_fit_score": item.get("senior_fit_score"),
- "is_final_retained": bool(item.get("is_final_retained")),
- }
- )
- existing_record = existing_records.get(name)
- if _is_finalized_record(existing_record):
- summary["finalized_resumed"] += 1
- record_id = int(item.get("record_id") or existing_record.get("id") or 0)
- if verbose:
- print(f"resume finalized word={name}")
- else:
- record_id = _persist_pending_item_record(
- repository,
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="finalized",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- export_rows.append(
- _build_export_row_from_item(
- item,
- analyze_ymd=analyze_ymd,
- strategy=strategy,
- )
- )
- if item.get("is_final_retained") and strategy:
- final_hive_rows.append(
- build_wxindex_word_hive_row(
- wxindex_word_record_id=record_id,
- word=name,
- strategy=strategy,
- partition_dt=analyze_ymd,
- max_score=result.get("max_score"),
- )
- )
- if (
- final_hive_rows
- and strategy
- and config is not None
- and not dry_run
- and not skip_odps
- ):
- odps_summary = sync_wxindex_word_rows_to_odps(
- config,
- repository,
- hive_rows=final_hive_rows,
- partition_dt=analyze_ymd,
- strategy=strategy,
- )
- summary["odps_written"] = odps_summary.get("written_count", 0)
- summary["odps_synced"] = odps_summary.get("odps_synced", 0)
- summary["odps_sync"] = odps_summary
- elif dry_run or skip_odps:
- summary["odps_written"] = len(final_hive_rows)
- summary["odps_synced"] = len(final_hive_rows)
- _refresh_wxindex_heat_job_summary(
- summary,
- pending_items=pending_items,
- export_rows=export_rows,
- )
- summary["retained_words"] = retained_words
- summary["export_rows"] = export_rows
- return summary
- WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
- "analyze_ymd",
- "name",
- "meta_id",
- "fetch_start_ymd",
- "fetch_end_ymd",
- "data_start_ymd",
- "data_end_ymd",
- "data_days",
- "analysis_skipped",
- "skip_reason",
- "is_sustained_high",
- "is_rising",
- "is_spike",
- "retain_reason",
- "is_internal_demand_matched",
- "matched_demand",
- "senior_fit_score",
- "is_final_retained",
- "min_score",
- "max_score",
- "avg_score",
- "demand_id",
- "weight",
- ]
- def write_wxindex_heat_pattern_csv(
- rows: list[dict[str, Any]],
- output_path: str | Path,
- *,
- fieldnames: list[str] | None = None,
- ) -> Path:
- """将热度分析明细写入本地 CSV。"""
- path = Path(output_path).expanduser()
- if not path.is_absolute():
- path = Path.cwd() / path
- path.parent.mkdir(parents=True, exist_ok=True)
- columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
- with path.open("w", encoding="utf-8-sig", newline="") as handle:
- writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
- writer.writeheader()
- for row in rows:
- writer.writerow({column: row.get(column, "") for column in columns})
- return path
|