| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667 |
- """微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
- from __future__ import annotations
- import csv
- import json
- from datetime import date, datetime, timedelta
- from pathlib import Path
- from typing import Any
- from app.hot_content.client import JsonApiClient
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
- from app.hot_content.demand_quality import (
- TYPE_PHRASE,
- llm_score_senior_fit,
- lookup_quality_scores,
- )
- from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.postprocess_service import ContributionPostprocessService
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.wxindex_trend import (
- HEAT_RISING_ADJACENT_UP_RATIO,
- HEAT_RISING_OVERALL_CHANGE_RATE,
- HEAT_RISING_RECENT_DROP_RATE,
- HEAT_RISING_WINDOW_CHANGE_RATE,
- HEAT_SPIKE_BASELINE_FLOOR,
- HEAT_SPIKE_RATIO,
- extract_sorted_scores,
- is_wxindex_heat_rising_scores,
- is_wxindex_spike_scores,
- )
- from app.hot_content.wxindex_words import filter_scores_in_ymd_window
- WXINDEX_HEAT_MIN_DAYS = 7
- WXINDEX_HEAT_MAX_DAYS = 14
- WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
- WXINDEX_SPIKE_LOOKBACK_DAYS = 3
- WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
- WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
- WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
- WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
- WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
- WXINDEX_RISING_RECENT_DROP_RATE = HEAT_RISING_RECENT_DROP_RATE
- WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
- WXINDEX_WORD_LLM_BATCH_SIZE = 10
- PATTERN_SUSTAINED_HIGH = "sustained_high"
- PATTERN_RISING = "rising"
- PATTERN_SPIKE = "spike"
- RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
- RETAIN_REASON_RISING = "热度持续上涨"
- RETAIN_REASON_SPIKE = "热度突然暴涨"
- PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
- PHASE_HEAT_ANALYZED = "heat_analyzed"
- PHASE_DEMAND_MATCHED = "demand_matched"
- PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
- PHASE_FINALIZED = "finalized"
- _HEAT_DONE_PHASES = frozenset(
- {
- PHASE_ANALYSIS_SKIPPED,
- PHASE_HEAT_ANALYZED,
- PHASE_DEMAND_MATCHED,
- PHASE_SENIOR_FIT_SCORED,
- PHASE_FINALIZED,
- }
- )
- _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
- "data_start_ymd": None,
- "data_end_ymd": None,
- "data_days": None,
- "is_sustained_high": None,
- "is_rising": None,
- "is_spike": None,
- "retain_reason": None,
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "is_final_retained": None,
- "min_score": None,
- "max_score": None,
- "avg_score": None,
- "detail_json": None,
- }
- def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
- detail = record.get("detail_json")
- if isinstance(detail, dict):
- return detail
- if isinstance(detail, str) and detail.strip():
- try:
- parsed = json.loads(detail)
- return parsed if isinstance(parsed, dict) else {}
- except json.JSONDecodeError:
- return {}
- return {}
- def _record_phase(record: dict[str, Any]) -> str:
- return str(_parse_record_detail_json(record).get("phase") or "").strip()
- def _nullable_bool_from_record(value: Any) -> bool | None:
- if value is None:
- return None
- return bool(value)
- def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
- value = record.get(field)
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- try:
- return json.loads(value)
- except json.JSONDecodeError:
- return None
- return value
- def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) in _HEAT_DONE_PHASES
- def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
- return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
- def _is_senior_fit_needed(
- *,
- retain_reason: str | None,
- is_internal_demand_matched: bool | None,
- ) -> bool:
- if not retain_reason:
- return False
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- return bool(is_internal_demand_matched)
- return True
- def _is_finalized_record(record: dict[str, Any] | None) -> bool:
- if not record:
- return False
- return _record_phase(record) == PHASE_FINALIZED
- def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
- ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
- return max(ymds) if ymds else None
- def _should_rerun_heat_analysis(
- existing_record: dict[str, Any] | None,
- *,
- scores: list[dict[str, Any]],
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- min_days: int,
- max_days: int,
- ) -> bool:
- if not _is_heat_analysis_done(existing_record):
- return True
- assert existing_record is not None
- latest_ymd = _latest_score_ymd(scores)
- record_end = str(existing_record.get("data_end_ymd") or "").strip()
- if latest_ymd and record_end and latest_ymd > record_end:
- return True
- if _is_analysis_skipped_record(existing_record):
- _, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason is None:
- return True
- return False
- def _is_senior_fit_attempt_done(
- item: dict[str, Any],
- existing_record: dict[str, Any] | None,
- ) -> bool:
- if existing_record:
- phase = _record_phase(existing_record)
- if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
- return True
- return item.get("senior_fit_score") is not None
- def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
- if score is None:
- return False
- try:
- return float(score) / 10.0 > senior_threshold
- except (TypeError, ValueError):
- return False
- def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- retain_reason_raw = record.get("retain_reason")
- retain_reason = (
- str(retain_reason_raw).strip() if retain_reason_raw is not None else None
- ) or None
- def _bool_field(key: str) -> bool | None:
- value = record.get(key)
- if value is None:
- return None
- return bool(value)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": record.get("data_days"),
- "data_start_ymd": record.get("data_start_ymd"),
- "data_end_ymd": record.get("data_end_ymd"),
- "fetch_start_ymd": record.get("fetch_start_ymd"),
- "fetch_end_ymd": record.get("fetch_end_ymd"),
- "min_score": record.get("min_score"),
- "max_score": record.get("max_score"),
- "avg_score": record.get("avg_score"),
- "is_sustained_high": _bool_field("is_sustained_high"),
- "is_rising": _bool_field("is_rising"),
- "is_spike": _bool_field("is_spike"),
- "retain_reason": retain_reason,
- "patterns": list(detail.get("patterns") or []),
- }
- def _rehydrate_pending_item_from_record(
- item: dict[str, Any],
- record: dict[str, Any],
- *,
- senior_threshold: float,
- ) -> dict[str, Any]:
- result = _rehydrate_result_from_record(record)
- retain_reason = result.get("retain_reason")
- is_internal_demand_matched = _nullable_bool_from_record(
- record.get("is_internal_demand_matched")
- )
- matched_demand_raw = record.get("matched_demand")
- matched_demand = (
- str(matched_demand_raw).strip() if matched_demand_raw is not None else None
- ) or None
- senior_fit_score = record.get("senior_fit_score")
- senior_fit_passed: bool | None = None
- is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
- if senior_fit_score is not None:
- senior_fit_passed = _senior_fit_passed_from_score(
- senior_fit_score,
- senior_threshold=senior_threshold,
- )
- elif not _is_senior_fit_needed(
- retain_reason=retain_reason,
- is_internal_demand_matched=is_internal_demand_matched,
- ):
- senior_fit_passed = False
- if is_final_retained is None:
- is_final_retained = False
- return {
- "meta": item["meta"],
- "name": item["name"],
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "result": result,
- "retain_reason": retain_reason,
- "record_id": int(record.get("id") or item.get("record_id") or 0),
- "is_internal_demand_matched": is_internal_demand_matched,
- "matched_demand": matched_demand,
- "internal_demand_match_json": _parse_record_json_field(
- record,
- "internal_demand_match_json",
- ),
- "senior_fit_score": senior_fit_score,
- "demand_senior_fit_json": _parse_record_json_field(
- record,
- "demand_senior_fit_json",
- ),
- "senior_fit_passed": senior_fit_passed,
- "is_final_retained": is_final_retained,
- }
- def _build_skipped_export_row_from_record(
- *,
- record: dict[str, Any],
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- ) -> dict[str, Any]:
- detail = _parse_record_detail_json(record)
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": detail.get("skip_reason") or "",
- "data_days": record.get("data_days") or "",
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- def _refresh_wxindex_heat_job_summary(
- summary: dict[str, Any],
- *,
- pending_items: list[dict[str, Any]],
- export_rows: list[dict[str, Any]],
- ) -> None:
- summary["analyzed"] = len(pending_items)
- summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
- summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
- summary["sustained_high"] = sum(
- 1
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- )
- summary["rising"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
- )
- summary["spike"] = sum(
- 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
- )
- summary["internal_demand_matched"] = sum(
- 1 for item in pending_items if item.get("is_internal_demand_matched")
- )
- summary["senior_fit_candidates"] = sum(
- 1
- for item in pending_items
- if _is_senior_fit_needed(
- retain_reason=item.get("retain_reason"),
- is_internal_demand_matched=item.get("is_internal_demand_matched"),
- )
- )
- summary["senior_scored"] = sum(
- 1 for item in pending_items if item.get("senior_fit_score") is not None
- )
- summary["senior_fit_passed"] = sum(
- 1 for item in pending_items if item.get("senior_fit_passed")
- )
- summary["final_retained"] = sum(
- 1 for item in pending_items if item.get("is_final_retained")
- )
- def resolve_retain_reason(
- *,
- is_sustained_high: bool,
- is_rising: bool,
- is_spike: bool,
- ) -> str | None:
- """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
- if is_rising:
- return RETAIN_REASON_RISING
- if is_spike:
- return RETAIN_REASON_SPIKE
- if is_sustained_high:
- return RETAIN_REASON_SUSTAINED_HIGH
- return None
- def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
- size = max(batch_size, 1)
- return [words[index : index + size] for index in range(0, len(words), size)]
- def _empty_senior_fit_result() -> dict[str, Any]:
- return {
- "senior_fit_score": None,
- "demand_senior_fit_json": {"source": "", "items": []},
- "passed": False,
- }
- def score_wxindex_words_senior_fit(
- *,
- words: list[str],
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
- ) -> dict[str, dict[str, Any]]:
- """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
- cleaned: list[str] = []
- seen: set[str] = set()
- for raw in words:
- word = str(raw or "").strip()
- if not word or word in seen:
- continue
- seen.add(word)
- cleaned.append(word)
- results: dict[str, dict[str, Any]] = {
- word: _empty_senior_fit_result() for word in cleaned
- }
- for batch in _chunk_words(cleaned, batch_size=batch_size):
- candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
- senior_fit_json = llm_score_senior_fit(
- channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
- candidates=candidates,
- model=config.demand_quality_llm_model,
- max_attempts=config.demand_quality_llm_max_attempts,
- retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
- max_tokens=config.demand_quality_llm_max_tokens,
- )
- senior_fit_json["threshold"] = senior_threshold * 10.0
- for word in batch:
- _, senior_score = lookup_quality_scores(
- demand_type=TYPE_PHRASE,
- demand_text=word,
- event_sense_json=None,
- senior_fit_json=senior_fit_json,
- )
- passed = (
- senior_score is not None
- and senior_score / 10.0 > senior_threshold
- )
- results[word] = {
- "senior_fit_score": senior_score,
- "demand_senior_fit_json": senior_fit_json,
- "passed": passed,
- }
- return results
- def score_wxindex_word_senior_fit(
- *,
- word: str,
- config: FlowConfig,
- senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
- ) -> dict[str, Any]:
- """对单个微信指数热词执行老年性 LLM 评分。"""
- target_word = str(word or "").strip()
- if not target_word:
- return _empty_senior_fit_result()
- batch_results = score_wxindex_words_senior_fit(
- words=[target_word],
- config=config,
- senior_threshold=senior_threshold,
- batch_size=1,
- )
- return batch_results.get(target_word, _empty_senior_fit_result())
- def build_wxindex_odps_extend(retain_reason: str | None) -> str:
- method = str(retain_reason or "").strip()
- if not method:
- return "{}"
- return json.dumps({"method": method}, ensure_ascii=False)
- def build_wxindex_word_hive_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- retain_reason: str | None = None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = 0.0
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "record_id": wxindex_word_record_id,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "weight": weight,
- "type": TYPE_PHRASE,
- "video_count": None,
- "video_list": [],
- "extend": build_wxindex_odps_extend(retain_reason),
- "dt": partition_dt,
- }
- def build_wxindex_word_odps_sync_row(
- *,
- wxindex_word_record_id: int,
- word: str,
- strategy: str,
- partition_dt: str,
- max_score: float | None,
- retain_reason: str | None = None,
- ) -> dict[str, Any]:
- normalized_name = str(word or "").strip()
- weight = None
- if max_score is not None:
- weight = float(max_score) / WEIGHT_DIVISOR
- return {
- "partition_dt": partition_dt,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "demand_type": TYPE_PHRASE,
- "record_id": wxindex_word_record_id,
- "weight": weight,
- "extend": build_wxindex_odps_extend(retain_reason),
- }
- def prepare_analysis_scores(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- ) -> tuple[list[dict[str, Any]], str | None]:
- """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
- window_scores = filter_scores_in_ymd_window(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- if len(window_scores) > max_days:
- window_scores = window_scores[-max_days:]
- if len(window_scores) < min_days:
- return window_scores, "insufficient_days"
- return window_scores, None
- def _score_row_ymd(row: dict[str, Any]) -> str:
- return str(row.get("ymd") or row.get("dt") or "").strip()
- def _window_ymd_bounds(
- window_scores: list[dict[str, Any]],
- ) -> tuple[str | None, str | None]:
- if not window_scores:
- return None, None
- start_ymd = _score_row_ymd(window_scores[0]) or None
- end_ymd = _score_row_ymd(window_scores[-1]) or None
- return start_ymd, end_ymd
- def analyze_wxindex_heat_patterns(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- ) -> dict[str, Any]:
- """对目标区间数据判断三种热度模式。"""
- window_scores, skip_reason = prepare_analysis_scores(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- if skip_reason:
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- return {
- "skipped": True,
- "skip_reason": skip_reason,
- "data_days": len(window_scores) if window_scores else None,
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "patterns": [],
- }
- numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
- data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
- is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
- is_rising = is_wxindex_heat_rising_scores(
- numeric_scores,
- min_points=min_days,
- overall_change_rate=rising_overall_change_rate,
- window_change_rate_threshold=rising_window_change_rate,
- adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- is_spike = is_wxindex_spike_scores(
- numeric_scores,
- spike_days=spike_days,
- min_points=min_days,
- spike_ratio=spike_ratio,
- baseline_floor=spike_baseline_floor,
- )
- retain_reason = resolve_retain_reason(
- is_sustained_high=is_sustained_high,
- is_rising=is_rising,
- is_spike=is_spike,
- )
- patterns: list[str] = []
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- patterns.append(PATTERN_SUSTAINED_HIGH)
- elif retain_reason == RETAIN_REASON_RISING:
- patterns.append(PATTERN_RISING)
- elif retain_reason == RETAIN_REASON_SPIKE:
- patterns.append(PATTERN_SPIKE)
- return {
- "skipped": False,
- "skip_reason": None,
- "data_days": len(window_scores),
- "data_start_ymd": data_start_ymd,
- "data_end_ymd": data_end_ymd,
- "fetch_start_ymd": start_ymd,
- "fetch_end_ymd": end_ymd,
- "min_score": min(numeric_scores),
- "max_score": max(numeric_scores),
- "avg_score": sum(numeric_scores) / len(numeric_scores),
- "is_sustained_high": is_sustained_high,
- "is_rising": is_rising,
- "is_spike": is_spike,
- "retain_reason": retain_reason,
- "patterns": patterns,
- "scores": window_scores,
- }
- def build_wxindex_word_record_init_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- def build_wxindex_word_record_skipped_payload(
- *,
- meta: dict[str, Any],
- name: str,
- analyze_ymd: str,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- result: dict[str, Any],
- demand_cache_run_id: int | None = None,
- ) -> dict[str, Any]:
- """分析被跳过时更新追溯记录。"""
- payload = {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "demand_cache_run_id": demand_cache_run_id,
- **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
- }
- payload.update(
- {
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "detail_json": {
- "phase": "analysis_skipped",
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- },
- }
- )
- return payload
- def build_wxindex_word_record_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- ) -> dict[str, Any]:
- """根据 pending item 当前状态构建 records 表 payload。"""
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- return {
- "name": name,
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- "retain_reason": retain_reason,
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": item.get("matched_demand"),
- "demand_cache_run_id": demand_cache_run_id,
- "internal_demand_match_json": item.get("internal_demand_match_json"),
- "senior_fit_score": item.get("senior_fit_score"),
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_final_retained": item.get("is_final_retained"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "phase": phase,
- "patterns": list(result.get("patterns") or []),
- "retain_reason": retain_reason,
- "retain_reason_priority": "2->3->1",
- "senior_fit_threshold": senior_threshold,
- "senior_fit_threshold_score": senior_threshold * 10.0,
- "is_final_retained": item.get("is_final_retained"),
- "sustained_threshold": sustained_threshold,
- "spike_days": spike_days,
- "spike_ratio": spike_ratio,
- "spike_baseline_floor": spike_baseline_floor,
- "rising_overall_change_rate": rising_overall_change_rate,
- "rising_window_change_rate": rising_window_change_rate,
- "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
- },
- }
- def _persist_wxindex_word_record(
- repository: HotContentRepository,
- payload: dict[str, Any],
- *,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- action: str,
- ) -> int:
- name = str(payload.get("name") or "").strip()
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would {action} wxindex word record word={name}")
- return 0
- return repository.save_wxindex_word_record(payload)
- def _persist_pending_item_record(
- repository: HotContentRepository,
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- phase: str,
- senior_threshold: float,
- sustained_threshold: float,
- spike_days: int,
- spike_ratio: float,
- spike_baseline_floor: float,
- rising_overall_change_rate: float,
- rising_window_change_rate: float,
- rising_adjacent_up_ratio: float,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> int:
- payload = build_wxindex_word_record_payload(
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase=phase,
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- record_id = _persist_wxindex_word_record(
- repository,
- payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action=f"{phase} word={item['name']}",
- )
- if record_id:
- item["record_id"] = record_id
- return int(item.get("record_id") or 0)
- def _filter_candidates_awaiting_yesterday_score(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- yesterday_ymd: str,
- existing_records: dict[str, dict[str, Any]],
- verbose: bool,
- ) -> tuple[list[dict[str, Any]], int]:
- """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
- names_to_check = [
- item["name"]
- for item in candidate_items
- if not _is_heat_analysis_done(existing_records.get(item["name"]))
- ]
- if not names_to_check:
- return candidate_items, 0
- names_with_yesterday = repository.list_wxindex_word_names_with_dt(
- names_to_check,
- dt=yesterday_ymd,
- )
- ready_items: list[dict[str, Any]] = []
- awaiting_count = 0
- for item in candidate_items:
- name = item["name"]
- if _is_heat_analysis_done(existing_records.get(name)):
- ready_items.append(item)
- continue
- if name in names_with_yesterday:
- ready_items.append(item)
- continue
- awaiting_count += 1
- if verbose:
- print(
- f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
- )
- return ready_items, awaiting_count
- def _init_candidate_wxindex_word_records(
- repository: HotContentRepository,
- candidate_items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- demand_cache_run_id: int | None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> None:
- if not candidate_items:
- return
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(
- f"[{label}] would batch init wxindex word records "
- f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
- )
- return
- init_payloads = [
- build_wxindex_word_record_init_payload(
- meta=item["meta"],
- name=item["name"],
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=item["fetch_start_ymd"],
- fetch_end_ymd=item["fetch_end_ymd"],
- demand_cache_run_id=demand_cache_run_id,
- )
- for item in candidate_items
- ]
- record_id_map = repository.init_wxindex_word_records(init_payloads)
- for item in candidate_items:
- item["record_id"] = int(record_id_map.get(item["name"]) or 0)
- if verbose:
- print(
- f"init wxindex word records count={len(candidate_items)} "
- f"analyze_ymd={analyze_ymd}"
- )
- def _apply_demand_match_to_item(
- item: dict[str, Any],
- match_result: dict[str, Any],
- ) -> None:
- item["internal_demand_match_json"] = match_result
- item["is_internal_demand_matched"] = bool(match_result.get("matched"))
- matched_demand = str(match_result.get("matched_demand") or "").strip()
- item["matched_demand"] = matched_demand or None
- if not item.get("is_internal_demand_matched"):
- item["is_final_retained"] = False
- def _apply_senior_fit_to_item(
- item: dict[str, Any],
- senior_result: dict[str, Any],
- ) -> None:
- passed = bool(senior_result.get("passed"))
- item["senior_fit_score"] = senior_result.get("senior_fit_score")
- item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
- item["senior_fit_passed"] = passed
- item["is_final_retained"] = passed
- def build_wxindex_word_stats_payload(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- ) -> dict[str, Any]:
- """构建通过热度+老年性筛选的词统计 payload。"""
- meta = item["meta"]
- result = item["result"]
- return {
- "name": item["name"],
- "meta_id": meta.get("id"),
- "analyze_ymd": analyze_ymd,
- "wxindex_word_record_id": item.get("record_id"),
- "retain_reason": item.get("retain_reason"),
- "senior_fit_score": item.get("senior_fit_score"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "detail_json": {
- "demand_senior_fit_json": item.get("demand_senior_fit_json"),
- "is_sustained_high": result.get("is_sustained_high"),
- "is_rising": result.get("is_rising"),
- "is_spike": result.get("is_spike"),
- },
- }
- def _save_senior_fit_passed_stats(
- repository: HotContentRepository,
- items: list[dict[str, Any]],
- *,
- analyze_ymd: str,
- existing_stats_names: set[str] | None = None,
- dry_run: bool,
- skip_db_save: bool,
- verbose: bool,
- ) -> tuple[int, int]:
- existing = existing_stats_names or set()
- all_payloads = [
- build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
- for item in items
- if item.get("senior_fit_passed")
- ]
- resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
- payloads = [payload for payload in all_payloads if payload["name"] not in existing]
- if not payloads:
- return 0, resumed
- if dry_run or skip_db_save:
- if verbose:
- label = "dry-run" if dry_run else "skip-db-save"
- print(f"[{label}] would save wxindex word stats count={len(payloads)}")
- return len(payloads), resumed
- saved = repository.save_wxindex_word_stats_batch(payloads)
- if verbose:
- print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
- return saved, resumed
- def _build_export_row_from_item(
- item: dict[str, Any],
- *,
- analyze_ymd: str,
- strategy: str,
- ) -> dict[str, Any]:
- meta = item["meta"]
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- is_internal_demand_matched = item.get("is_internal_demand_matched")
- matched_demand = str(item.get("matched_demand") or "")
- senior_fit_score = item.get("senior_fit_score")
- is_final_retained = bool(item.get("is_final_retained"))
- return {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": item["fetch_start_ymd"],
- "fetch_end_ymd": item["fetch_end_ymd"],
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "data_days": result.get("data_days"),
- "analysis_skipped": False,
- "skip_reason": "",
- "is_sustained_high": bool(result.get("is_sustained_high")),
- "is_rising": bool(result.get("is_rising")),
- "is_spike": bool(result.get("is_spike")),
- "retain_reason": retain_reason or "",
- "is_internal_demand_matched": (
- "" if is_internal_demand_matched is None else is_internal_demand_matched
- ),
- "matched_demand": matched_demand,
- "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
- "is_final_retained": is_final_retained,
- "min_score": result.get("min_score"),
- "max_score": result.get("max_score"),
- "avg_score": result.get("avg_score"),
- "demand_id": (
- build_demand_id(
- strategy=strategy,
- demand_name=name,
- partition_dt=analyze_ymd,
- )
- if is_final_retained and strategy
- else ""
- ),
- "weight": (
- float(result.get("max_score") or 0) / WEIGHT_DIVISOR
- if is_final_retained and result.get("max_score") is not None
- else ""
- ),
- }
- def run_wxindex_heat_pattern_daily_job(
- repository: HotContentRepository,
- *,
- config: FlowConfig | None = None,
- api_client: JsonApiClient | None = None,
- today: date | None = None,
- sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
- min_days: int = WXINDEX_HEAT_MIN_DAYS,
- max_days: int = WXINDEX_HEAT_MAX_DAYS,
- spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
- spike_ratio: float = WXINDEX_SPIKE_RATIO,
- spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
- rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
- rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
- rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
- dry_run: bool = False,
- skip_odps: bool = False,
- skip_db_save: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- analyze_ymd = current.strftime("%Y%m%d")
- meta_rows = repository.list_active_wxindex_word_meta(today=current)
- demand_name_set: list[str] = []
- demand_cache_run_id: int | None = None
- demand_cache_error: str | None = None
- postprocess_service: ContributionPostprocessService | None = None
- if config is not None:
- try:
- cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
- demand_name_set = list(cache.get("demand_name_set") or [])
- demand_cache_run_id = int(cache["id"])
- if demand_name_set:
- postprocess_service = ContributionPostprocessService(
- config,
- repository,
- api_client
- or JsonApiClient(
- timeout_seconds=config.request_timeout_seconds,
- verify_ssl=config.https_verify_ssl,
- ),
- )
- except HotContentFlowError as exc:
- demand_cache_error = str(exc)
- if verbose:
- print(f"demand cache unavailable, skip demand match: {exc}")
- summary: dict[str, Any] = {
- "analyze_ymd": analyze_ymd,
- "meta_count": len(meta_rows),
- "analyzed": 0,
- "skipped": 0,
- "retained": 0,
- "sustained_high": 0,
- "rising": 0,
- "spike": 0,
- "internal_demand_matched": 0,
- "senior_scored": 0,
- "senior_fit_candidates": 0,
- "senior_fit_passed": 0,
- "stats_saved": 0,
- "final_retained": 0,
- "odps_synced": 0,
- "odps_written": 0,
- "demand_cache_run_id": demand_cache_run_id,
- "demand_cache_error": demand_cache_error,
- "demand_name_count": len(demand_name_set),
- "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
- "demand_match_batches": 0,
- "senior_fit_batches": 0,
- "records_initialized": 0,
- "awaiting_yesterday_score": 0,
- "heat_resumed": 0,
- "demand_match_resumed": 0,
- "senior_fit_resumed": 0,
- "stats_resumed": 0,
- "finalized_resumed": 0,
- "dry_run": dry_run,
- "skip_odps": skip_odps,
- "skip_db_save": skip_db_save,
- }
- retained_words: list[dict[str, Any]] = []
- export_rows: list[dict[str, Any]] = []
- final_hive_rows: list[dict[str, Any]] = []
- senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
- strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
- pending_items: list[dict[str, Any]] = []
- candidate_items: list[dict[str, Any]] = []
- for meta in meta_rows:
- name = str(meta.get("name") or "").strip()
- fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
- fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
- if not name or not fetch_start_ymd or not fetch_end_ymd:
- summary["skipped"] += 1
- continue
- candidate_items.append(
- {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- }
- )
- _init_candidate_wxindex_word_records(
- repository,
- candidate_items,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["records_initialized"] = len(candidate_items)
- existing_records: dict[str, dict[str, Any]] = {}
- existing_stats_names: set[str] = set()
- if not dry_run and not skip_db_save:
- candidate_names = [item["name"] for item in candidate_items]
- existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- existing_stats_names = repository.list_wxindex_word_stats_names(
- analyze_ymd=analyze_ymd,
- names=candidate_names,
- )
- yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
- candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
- repository,
- candidate_items,
- yesterday_ymd=yesterday_ymd,
- existing_records=existing_records,
- verbose=verbose,
- )
- summary["awaiting_yesterday_score"] = awaiting_yesterday
- for item in candidate_items:
- if not item.get("record_id"):
- existing = existing_records.get(item["name"])
- if existing:
- item["record_id"] = int(existing.get("id") or 0)
- for item in candidate_items:
- meta = item["meta"]
- name = item["name"]
- fetch_start_ymd = item["fetch_start_ymd"]
- fetch_end_ymd = item["fetch_end_ymd"]
- existing_record = existing_records.get(name)
- scores = repository.list_wxindex_word_scores_in_range(
- name,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- if (
- _is_heat_analysis_done(existing_record)
- and not _should_rerun_heat_analysis(
- existing_record,
- scores=scores,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- min_days=min_days,
- max_days=max_days,
- )
- ):
- summary["heat_resumed"] += 1
- if _is_analysis_skipped_record(existing_record):
- if verbose:
- detail = _parse_record_detail_json(existing_record)
- print(
- f"resume skip word={name} reason={detail.get('skip_reason')} "
- f"days={existing_record.get('data_days')}"
- )
- export_rows.append(
- _build_skipped_export_row_from_record(
- record=existing_record,
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- )
- )
- continue
- pending_item = _rehydrate_pending_item_from_record(
- item,
- existing_record,
- senior_threshold=senior_threshold,
- )
- pending_items.append(pending_item)
- if verbose:
- print(
- f"resume heat analyzed word={name} "
- f"retain_reason={pending_item.get('retain_reason') or ''} "
- f"data_days={pending_item['result'].get('data_days')}"
- )
- continue
- result = analyze_wxindex_heat_patterns(
- scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- sustained_threshold=sustained_threshold,
- min_days=min_days,
- max_days=max_days,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- )
- if result.get("skipped"):
- if verbose:
- print(
- f"skip word={name} reason={result.get('skip_reason')} "
- f"days={result.get('data_days')}"
- )
- skipped_payload = build_wxindex_word_record_skipped_payload(
- meta=meta,
- name=name,
- analyze_ymd=analyze_ymd,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- result=result,
- demand_cache_run_id=demand_cache_run_id,
- )
- _persist_wxindex_word_record(
- repository,
- skipped_payload,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- action="update skipped",
- )
- export_rows.append(
- {
- "analyze_ymd": analyze_ymd,
- "name": name,
- "meta_id": meta.get("id"),
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "analysis_skipped": True,
- "skip_reason": result.get("skip_reason"),
- "data_days": result.get("data_days"),
- "retain_reason": "",
- "is_sustained_high": False,
- "is_rising": False,
- "is_spike": False,
- "is_internal_demand_matched": "",
- "matched_demand": "",
- "senior_fit_score": "",
- "is_final_retained": False,
- "min_score": "",
- "max_score": "",
- "avg_score": "",
- }
- )
- continue
- retain_reason = str(result.get("retain_reason") or "").strip() or None
- pending_item = {
- "meta": meta,
- "name": name,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- "result": result,
- "retain_reason": retain_reason,
- "record_id": item.get("record_id", 0),
- "is_internal_demand_matched": None,
- "matched_demand": None,
- "internal_demand_match_json": None,
- "senior_fit_score": None,
- "demand_senior_fit_json": None,
- "senior_fit_passed": None,
- "is_final_retained": None,
- }
- pending_items.append(pending_item)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="heat_analyzed",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose:
- print(
- f"heat analyzed word={name} retain_reason={retain_reason or ''} "
- f"data_days={result.get('data_days')}"
- )
- demand_match_results: dict[str, dict[str, Any]] = {}
- pending_by_name = {item["name"]: item for item in pending_items}
- sustained_high_words = [
- item["name"]
- for item in pending_items
- if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
- and item.get("is_internal_demand_matched") is None
- ]
- for item in pending_items:
- if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
- continue
- if item.get("is_internal_demand_matched") is not None:
- summary["demand_match_resumed"] += 1
- def _persist_demand_match_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- if pending_item is None:
- continue
- match_result = demand_match_results.get(word) or {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- }
- _apply_demand_match_to_item(pending_item, match_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="demand_matched",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if sustained_high_words:
- if postprocess_service is not None and demand_name_set:
- for batch in _chunk_words(sustained_high_words):
- summary["demand_match_batches"] += 1
- if verbose:
- print(f"demand match batch size={len(batch)} words={batch}")
- demand_match_results.update(
- postprocess_service.match_words_to_demand_pool(
- words=batch,
- demand_name_set=demand_name_set,
- )
- )
- _persist_demand_match_updates(batch)
- else:
- for word in sustained_high_words:
- demand_match_results[word] = {
- "word": word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- "skip_reason": "empty_demand_cache",
- }
- _persist_demand_match_updates(sustained_high_words)
- senior_fit_candidate_names: list[str] = []
- for item in pending_items:
- retain_reason = item.get("retain_reason")
- name = str(item.get("name") or "").strip()
- if not retain_reason or not name:
- continue
- if _is_senior_fit_attempt_done(item, existing_records.get(name)):
- summary["senior_fit_resumed"] += 1
- continue
- if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
- if not item.get("is_internal_demand_matched"):
- continue
- senior_fit_candidate_names.append(name)
- senior_fit_results: dict[str, dict[str, Any]] = {}
- def _persist_senior_fit_updates(words: list[str]) -> None:
- for word in words:
- pending_item = pending_by_name.get(word)
- senior_result = senior_fit_results.get(word)
- if pending_item is None or senior_result is None:
- continue
- _apply_senior_fit_to_item(pending_item, senior_result)
- _persist_pending_item_record(
- repository,
- pending_item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="senior_fit_scored",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- if verbose and not pending_item.get("senior_fit_passed"):
- print(
- f"senior fit rejected word={word} "
- f"score={pending_item.get('senior_fit_score')}"
- )
- if config is not None and senior_fit_candidate_names:
- for batch in _chunk_words(senior_fit_candidate_names):
- summary["senior_fit_batches"] += 1
- if verbose:
- print(f"senior fit batch size={len(batch)} words={batch}")
- senior_fit_results.update(
- score_wxindex_words_senior_fit(
- words=batch,
- config=config,
- senior_threshold=senior_threshold,
- batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
- )
- )
- _persist_senior_fit_updates(batch)
- stats_saved, stats_resumed = _save_senior_fit_passed_stats(
- repository,
- pending_items,
- analyze_ymd=analyze_ymd,
- existing_stats_names=existing_stats_names,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- summary["stats_saved"] = stats_saved
- summary["stats_resumed"] = stats_resumed
- for item in pending_items:
- name = item["name"]
- result = item["result"]
- retain_reason = item.get("retain_reason")
- if retain_reason:
- retained_words.append(
- {
- "name": name,
- "retain_reason": retain_reason,
- "data_days": result.get("data_days"),
- "data_start_ymd": result.get("data_start_ymd"),
- "data_end_ymd": result.get("data_end_ymd"),
- "is_internal_demand_matched": item.get("is_internal_demand_matched"),
- "matched_demand": str(item.get("matched_demand") or ""),
- "senior_fit_score": item.get("senior_fit_score"),
- "is_final_retained": bool(item.get("is_final_retained")),
- }
- )
- existing_record = existing_records.get(name)
- if _is_finalized_record(existing_record):
- summary["finalized_resumed"] += 1
- record_id = int(item.get("record_id") or existing_record.get("id") or 0)
- if verbose:
- print(f"resume finalized word={name}")
- else:
- record_id = _persist_pending_item_record(
- repository,
- item,
- analyze_ymd=analyze_ymd,
- demand_cache_run_id=demand_cache_run_id,
- phase="finalized",
- senior_threshold=senior_threshold,
- sustained_threshold=sustained_threshold,
- spike_days=spike_days,
- spike_ratio=spike_ratio,
- spike_baseline_floor=spike_baseline_floor,
- rising_overall_change_rate=rising_overall_change_rate,
- rising_window_change_rate=rising_window_change_rate,
- rising_adjacent_up_ratio=rising_adjacent_up_ratio,
- dry_run=dry_run,
- skip_db_save=skip_db_save,
- verbose=verbose,
- )
- export_rows.append(
- _build_export_row_from_item(
- item,
- analyze_ymd=analyze_ymd,
- strategy=strategy,
- )
- )
- if item.get("is_final_retained") and strategy:
- final_hive_rows.append(
- build_wxindex_word_hive_row(
- wxindex_word_record_id=record_id,
- word=name,
- strategy=strategy,
- partition_dt=analyze_ymd,
- max_score=result.get("max_score"),
- retain_reason=item.get("retain_reason"),
- )
- )
- if (
- final_hive_rows
- and strategy
- and config is not None
- and not dry_run
- and not skip_odps
- ):
- odps_summary = sync_wxindex_word_rows_to_odps(
- config,
- repository,
- hive_rows=final_hive_rows,
- partition_dt=analyze_ymd,
- strategy=strategy,
- )
- summary["odps_written"] = odps_summary.get("written_count", 0)
- summary["odps_synced"] = odps_summary.get("odps_synced", 0)
- summary["odps_sync"] = odps_summary
- elif dry_run or skip_odps:
- summary["odps_written"] = len(final_hive_rows)
- summary["odps_synced"] = len(final_hive_rows)
- _refresh_wxindex_heat_job_summary(
- summary,
- pending_items=pending_items,
- export_rows=export_rows,
- )
- summary["retained_words"] = retained_words
- summary["export_rows"] = export_rows
- return summary
- WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
- "analyze_ymd",
- "name",
- "meta_id",
- "fetch_start_ymd",
- "fetch_end_ymd",
- "data_start_ymd",
- "data_end_ymd",
- "data_days",
- "analysis_skipped",
- "skip_reason",
- "is_sustained_high",
- "is_rising",
- "is_spike",
- "retain_reason",
- "is_internal_demand_matched",
- "matched_demand",
- "senior_fit_score",
- "is_final_retained",
- "min_score",
- "max_score",
- "avg_score",
- "demand_id",
- "weight",
- ]
- def write_wxindex_heat_pattern_csv(
- rows: list[dict[str, Any]],
- output_path: str | Path,
- *,
- fieldnames: list[str] | None = None,
- ) -> Path:
- """将热度分析明细写入本地 CSV。"""
- path = Path(output_path).expanduser()
- if not path.is_absolute():
- path = Path.cwd() / path
- path.parent.mkdir(parents=True, exist_ok=True)
- columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
- with path.open("w", encoding="utf-8-sig", newline="") as handle:
- writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
- writer.writeheader()
- for row in rows:
- writer.writerow({column: row.get(column, "") for column in columns})
- return path
|