"""微信指数热度模式分析:持续高热、持续上涨、突然暴涨。""" from __future__ import annotations import csv import json from datetime import date, datetime, timedelta from pathlib import Path from typing import Any from app.hot_content.client import JsonApiClient from app.hot_content.demand_cache_service import DemandCacheService from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id from app.hot_content.demand_quality import ( TYPE_PHRASE, llm_score_senior_fit, lookup_quality_scores, ) from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps from app.hot_content.exceptions import HotContentFlowError from app.hot_content.postprocess_service import ContributionPostprocessService from app.hot_content.repository import HotContentRepository from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig from app.hot_content.wxindex_trend import ( HEAT_RISING_ADJACENT_UP_RATIO, HEAT_RISING_OVERALL_CHANGE_RATE, HEAT_RISING_RECENT_DROP_RATE, HEAT_RISING_WINDOW_CHANGE_RATE, HEAT_SPIKE_BASELINE_FLOOR, HEAT_SPIKE_RATIO, extract_sorted_scores, is_wxindex_heat_rising_scores, is_wxindex_spike_scores, ) from app.hot_content.wxindex_words import filter_scores_in_ymd_window WXINDEX_HEAT_MIN_DAYS = 7 WXINDEX_HEAT_MAX_DAYS = 14 WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0 WXINDEX_SPIKE_LOOKBACK_DAYS = 3 WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO WXINDEX_RISING_RECENT_DROP_RATE = HEAT_RISING_RECENT_DROP_RATE WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6 WXINDEX_WORD_LLM_BATCH_SIZE = 10 PATTERN_SUSTAINED_HIGH = "sustained_high" PATTERN_RISING = "rising" PATTERN_SPIKE = "spike" RETAIN_REASON_SUSTAINED_HIGH = "持续高热度" RETAIN_REASON_RISING = "热度持续上涨" RETAIN_REASON_SPIKE = "热度突然暴涨" PHASE_ANALYSIS_SKIPPED = "analysis_skipped" PHASE_HEAT_ANALYZED = "heat_analyzed" PHASE_DEMAND_MATCHED = "demand_matched" PHASE_SENIOR_FIT_SCORED = "senior_fit_scored" PHASE_FINALIZED = "finalized" _HEAT_DONE_PHASES = frozenset( { PHASE_ANALYSIS_SKIPPED, PHASE_HEAT_ANALYZED, PHASE_DEMAND_MATCHED, PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED, } ) _WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = { "data_start_ymd": None, "data_end_ymd": None, "data_days": None, "is_sustained_high": None, "is_rising": None, "is_spike": None, "retain_reason": None, "is_internal_demand_matched": None, "matched_demand": None, "internal_demand_match_json": None, "senior_fit_score": None, "demand_senior_fit_json": None, "is_final_retained": None, "min_score": None, "max_score": None, "avg_score": None, "detail_json": None, } def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]: detail = record.get("detail_json") if isinstance(detail, dict): return detail if isinstance(detail, str) and detail.strip(): try: parsed = json.loads(detail) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} return {} def _record_phase(record: dict[str, Any]) -> str: return str(_parse_record_detail_json(record).get("phase") or "").strip() def _nullable_bool_from_record(value: Any) -> bool | None: if value is None: return None return bool(value) def _parse_record_json_field(record: dict[str, Any], field: str) -> Any: value = record.get(field) if value is None: return None if isinstance(value, (dict, list)): return value if isinstance(value, (bytes, bytearray)): value = value.decode("utf-8") if isinstance(value, str): try: return json.loads(value) except json.JSONDecodeError: return None return value def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool: if not record: return False return _record_phase(record) in _HEAT_DONE_PHASES def _is_analysis_skipped_record(record: dict[str, Any]) -> bool: return _record_phase(record) == PHASE_ANALYSIS_SKIPPED def _is_senior_fit_needed( *, retain_reason: str | None, is_internal_demand_matched: bool | None, ) -> bool: if not retain_reason: return False if retain_reason == RETAIN_REASON_SUSTAINED_HIGH: return bool(is_internal_demand_matched) return True def _is_finalized_record(record: dict[str, Any] | None) -> bool: if not record: return False return _record_phase(record) == PHASE_FINALIZED def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None: ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)] return max(ymds) if ymds else None def _should_rerun_heat_analysis( existing_record: dict[str, Any] | None, *, scores: list[dict[str, Any]], fetch_start_ymd: str, fetch_end_ymd: str, min_days: int, max_days: int, ) -> bool: if not _is_heat_analysis_done(existing_record): return True assert existing_record is not None latest_ymd = _latest_score_ymd(scores) record_end = str(existing_record.get("data_end_ymd") or "").strip() if latest_ymd and record_end and latest_ymd > record_end: return True if _is_analysis_skipped_record(existing_record): _, skip_reason = prepare_analysis_scores( scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, min_days=min_days, max_days=max_days, ) if skip_reason is None: return True return False def _is_senior_fit_attempt_done( item: dict[str, Any], existing_record: dict[str, Any] | None, ) -> bool: if existing_record: phase = _record_phase(existing_record) if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED): return True return item.get("senior_fit_score") is not None def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool: if score is None: return False try: return float(score) / 10.0 > senior_threshold except (TypeError, ValueError): return False def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]: detail = _parse_record_detail_json(record) retain_reason_raw = record.get("retain_reason") retain_reason = ( str(retain_reason_raw).strip() if retain_reason_raw is not None else None ) or None def _bool_field(key: str) -> bool | None: value = record.get(key) if value is None: return None return bool(value) return { "skipped": False, "skip_reason": None, "data_days": record.get("data_days"), "data_start_ymd": record.get("data_start_ymd"), "data_end_ymd": record.get("data_end_ymd"), "fetch_start_ymd": record.get("fetch_start_ymd"), "fetch_end_ymd": record.get("fetch_end_ymd"), "min_score": record.get("min_score"), "max_score": record.get("max_score"), "avg_score": record.get("avg_score"), "is_sustained_high": _bool_field("is_sustained_high"), "is_rising": _bool_field("is_rising"), "is_spike": _bool_field("is_spike"), "retain_reason": retain_reason, "patterns": list(detail.get("patterns") or []), } def _rehydrate_pending_item_from_record( item: dict[str, Any], record: dict[str, Any], *, senior_threshold: float, ) -> dict[str, Any]: result = _rehydrate_result_from_record(record) retain_reason = result.get("retain_reason") is_internal_demand_matched = _nullable_bool_from_record( record.get("is_internal_demand_matched") ) matched_demand_raw = record.get("matched_demand") matched_demand = ( str(matched_demand_raw).strip() if matched_demand_raw is not None else None ) or None senior_fit_score = record.get("senior_fit_score") senior_fit_passed: bool | None = None is_final_retained = _nullable_bool_from_record(record.get("is_final_retained")) if senior_fit_score is not None: senior_fit_passed = _senior_fit_passed_from_score( senior_fit_score, senior_threshold=senior_threshold, ) elif not _is_senior_fit_needed( retain_reason=retain_reason, is_internal_demand_matched=is_internal_demand_matched, ): senior_fit_passed = False if is_final_retained is None: is_final_retained = False return { "meta": item["meta"], "name": item["name"], "fetch_start_ymd": item["fetch_start_ymd"], "fetch_end_ymd": item["fetch_end_ymd"], "result": result, "retain_reason": retain_reason, "record_id": int(record.get("id") or item.get("record_id") or 0), "is_internal_demand_matched": is_internal_demand_matched, "matched_demand": matched_demand, "internal_demand_match_json": _parse_record_json_field( record, "internal_demand_match_json", ), "senior_fit_score": senior_fit_score, "demand_senior_fit_json": _parse_record_json_field( record, "demand_senior_fit_json", ), "senior_fit_passed": senior_fit_passed, "is_final_retained": is_final_retained, } def _build_skipped_export_row_from_record( *, record: dict[str, Any], meta: dict[str, Any], name: str, analyze_ymd: str, fetch_start_ymd: str, fetch_end_ymd: str, ) -> dict[str, Any]: detail = _parse_record_detail_json(record) return { "analyze_ymd": analyze_ymd, "name": name, "meta_id": meta.get("id"), "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "analysis_skipped": True, "skip_reason": detail.get("skip_reason") or "", "data_days": record.get("data_days") or "", "retain_reason": "", "is_sustained_high": False, "is_rising": False, "is_spike": False, "is_internal_demand_matched": "", "matched_demand": "", "senior_fit_score": "", "is_final_retained": False, "min_score": "", "max_score": "", "avg_score": "", } def _refresh_wxindex_heat_job_summary( summary: dict[str, Any], *, pending_items: list[dict[str, Any]], export_rows: list[dict[str, Any]], ) -> None: summary["analyzed"] = len(pending_items) summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped")) summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason")) summary["sustained_high"] = sum( 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH ) summary["rising"] = sum( 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING ) summary["spike"] = sum( 1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE ) summary["internal_demand_matched"] = sum( 1 for item in pending_items if item.get("is_internal_demand_matched") ) summary["senior_fit_candidates"] = sum( 1 for item in pending_items if _is_senior_fit_needed( retain_reason=item.get("retain_reason"), is_internal_demand_matched=item.get("is_internal_demand_matched"), ) ) summary["senior_scored"] = sum( 1 for item in pending_items if item.get("senior_fit_score") is not None ) summary["senior_fit_passed"] = sum( 1 for item in pending_items if item.get("senior_fit_passed") ) summary["final_retained"] = sum( 1 for item in pending_items if item.get("is_final_retained") ) def resolve_retain_reason( *, is_sustained_high: bool, is_rising: bool, is_spike: bool, ) -> str | None: """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。""" if is_rising: return RETAIN_REASON_RISING if is_spike: return RETAIN_REASON_SPIKE if is_sustained_high: return RETAIN_REASON_SUSTAINED_HIGH return None def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]: size = max(batch_size, 1) return [words[index : index + size] for index in range(0, len(words), size)] def _empty_senior_fit_result() -> dict[str, Any]: return { "senior_fit_score": None, "demand_senior_fit_json": {"source": "", "items": []}, "passed": False, } def score_wxindex_words_senior_fit( *, words: list[str], config: FlowConfig, senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD, batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE, ) -> dict[str, dict[str, Any]]: """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。""" cleaned: list[str] = [] seen: set[str] = set() for raw in words: word = str(raw or "").strip() if not word or word in seen: continue seen.add(word) cleaned.append(word) results: dict[str, dict[str, Any]] = { word: _empty_senior_fit_result() for word in cleaned } for batch in _chunk_words(cleaned, batch_size=batch_size): candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch] senior_fit_json = llm_score_senior_fit( channel_content_id=f"wxindex_words:{','.join(batch[:3])}", candidates=candidates, model=config.demand_quality_llm_model, max_attempts=config.demand_quality_llm_max_attempts, retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds, max_tokens=config.demand_quality_llm_max_tokens, ) senior_fit_json["threshold"] = senior_threshold * 10.0 for word in batch: _, senior_score = lookup_quality_scores( demand_type=TYPE_PHRASE, demand_text=word, event_sense_json=None, senior_fit_json=senior_fit_json, ) passed = ( senior_score is not None and senior_score / 10.0 > senior_threshold ) results[word] = { "senior_fit_score": senior_score, "demand_senior_fit_json": senior_fit_json, "passed": passed, } return results def score_wxindex_word_senior_fit( *, word: str, config: FlowConfig, senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD, ) -> dict[str, Any]: """对单个微信指数热词执行老年性 LLM 评分。""" target_word = str(word or "").strip() if not target_word: return _empty_senior_fit_result() batch_results = score_wxindex_words_senior_fit( words=[target_word], config=config, senior_threshold=senior_threshold, batch_size=1, ) return batch_results.get(target_word, _empty_senior_fit_result()) def build_wxindex_odps_extend(retain_reason: str | None) -> str: method = str(retain_reason or "").strip() if not method: return "{}" return json.dumps({"method": method}, ensure_ascii=False) def build_wxindex_word_hive_row( *, wxindex_word_record_id: int, word: str, strategy: str, partition_dt: str, max_score: float | None, retain_reason: str | None = None, ) -> dict[str, Any]: normalized_name = str(word or "").strip() weight = 0.0 if max_score is not None: weight = float(max_score) / WEIGHT_DIVISOR return { "record_id": wxindex_word_record_id, "strategy": strategy, "demand_id": build_demand_id( strategy=strategy, demand_name=normalized_name, partition_dt=partition_dt, ), "demand_name": normalized_name, "weight": weight, "type": TYPE_PHRASE, "video_count": None, "video_list": [], "extend": build_wxindex_odps_extend(retain_reason), "dt": partition_dt, } def build_wxindex_word_odps_sync_row( *, wxindex_word_record_id: int, word: str, strategy: str, partition_dt: str, max_score: float | None, retain_reason: str | None = None, ) -> dict[str, Any]: normalized_name = str(word or "").strip() weight = None if max_score is not None: weight = float(max_score) / WEIGHT_DIVISOR return { "partition_dt": partition_dt, "strategy": strategy, "demand_id": build_demand_id( strategy=strategy, demand_name=normalized_name, partition_dt=partition_dt, ), "demand_name": normalized_name, "demand_type": TYPE_PHRASE, "record_id": wxindex_word_record_id, "weight": weight, "extend": build_wxindex_odps_extend(retain_reason), } def prepare_analysis_scores( scores: list[dict[str, Any]], *, start_ymd: str, end_ymd: str, min_days: int = WXINDEX_HEAT_MIN_DAYS, max_days: int = WXINDEX_HEAT_MAX_DAYS, ) -> tuple[list[dict[str, Any]], str | None]: """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。""" window_scores = filter_scores_in_ymd_window( scores, start_ymd=start_ymd, end_ymd=end_ymd, ) if len(window_scores) > max_days: window_scores = window_scores[-max_days:] if len(window_scores) < min_days: return window_scores, "insufficient_days" return window_scores, None def _score_row_ymd(row: dict[str, Any]) -> str: return str(row.get("ymd") or row.get("dt") or "").strip() def _window_ymd_bounds( window_scores: list[dict[str, Any]], ) -> tuple[str | None, str | None]: if not window_scores: return None, None start_ymd = _score_row_ymd(window_scores[0]) or None end_ymd = _score_row_ymd(window_scores[-1]) or None return start_ymd, end_ymd def analyze_wxindex_heat_patterns( scores: list[dict[str, Any]], *, start_ymd: str, end_ymd: str, sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD, min_days: int = WXINDEX_HEAT_MIN_DAYS, max_days: int = WXINDEX_HEAT_MAX_DAYS, spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS, spike_ratio: float = WXINDEX_SPIKE_RATIO, spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR, rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE, rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE, rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO, ) -> dict[str, Any]: """对目标区间数据判断三种热度模式。""" window_scores, skip_reason = prepare_analysis_scores( scores, start_ymd=start_ymd, end_ymd=end_ymd, min_days=min_days, max_days=max_days, ) if skip_reason: data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores) return { "skipped": True, "skip_reason": skip_reason, "data_days": len(window_scores) if window_scores else None, "data_start_ymd": data_start_ymd, "data_end_ymd": data_end_ymd, "patterns": [], } numeric_scores = extract_sorted_scores(window_scores, max_points=max_days) data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores) is_sustained_high = all(score > sustained_threshold for score in numeric_scores) is_rising = is_wxindex_heat_rising_scores( numeric_scores, min_points=min_days, overall_change_rate=rising_overall_change_rate, window_change_rate_threshold=rising_window_change_rate, adjacent_up_ratio=rising_adjacent_up_ratio, ) is_spike = is_wxindex_spike_scores( numeric_scores, spike_days=spike_days, min_points=min_days, spike_ratio=spike_ratio, baseline_floor=spike_baseline_floor, ) retain_reason = resolve_retain_reason( is_sustained_high=is_sustained_high, is_rising=is_rising, is_spike=is_spike, ) patterns: list[str] = [] if retain_reason == RETAIN_REASON_SUSTAINED_HIGH: patterns.append(PATTERN_SUSTAINED_HIGH) elif retain_reason == RETAIN_REASON_RISING: patterns.append(PATTERN_RISING) elif retain_reason == RETAIN_REASON_SPIKE: patterns.append(PATTERN_SPIKE) return { "skipped": False, "skip_reason": None, "data_days": len(window_scores), "data_start_ymd": data_start_ymd, "data_end_ymd": data_end_ymd, "fetch_start_ymd": start_ymd, "fetch_end_ymd": end_ymd, "min_score": min(numeric_scores), "max_score": max(numeric_scores), "avg_score": sum(numeric_scores) / len(numeric_scores), "is_sustained_high": is_sustained_high, "is_rising": is_rising, "is_spike": is_spike, "retain_reason": retain_reason, "patterns": patterns, "scores": window_scores, } def build_wxindex_word_record_init_payload( *, meta: dict[str, Any], name: str, analyze_ymd: str, fetch_start_ymd: str, fetch_end_ymd: str, demand_cache_run_id: int | None = None, ) -> dict[str, Any]: """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。""" return { "name": name, "meta_id": meta.get("id"), "analyze_ymd": analyze_ymd, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "demand_cache_run_id": demand_cache_run_id, **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS, } def build_wxindex_word_record_skipped_payload( *, meta: dict[str, Any], name: str, analyze_ymd: str, fetch_start_ymd: str, fetch_end_ymd: str, result: dict[str, Any], demand_cache_run_id: int | None = None, ) -> dict[str, Any]: """分析被跳过时更新追溯记录。""" payload = { "name": name, "meta_id": meta.get("id"), "analyze_ymd": analyze_ymd, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "demand_cache_run_id": demand_cache_run_id, **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS, } payload.update( { "data_start_ymd": result.get("data_start_ymd"), "data_end_ymd": result.get("data_end_ymd"), "data_days": result.get("data_days"), "detail_json": { "phase": "analysis_skipped", "skip_reason": result.get("skip_reason"), "data_days": result.get("data_days"), }, } ) return payload def build_wxindex_word_record_payload( item: dict[str, Any], *, analyze_ymd: str, demand_cache_run_id: int | None, phase: str, senior_threshold: float, sustained_threshold: float, spike_days: int, spike_ratio: float, spike_baseline_floor: float, rising_overall_change_rate: float, rising_window_change_rate: float, rising_adjacent_up_ratio: float, ) -> dict[str, Any]: """根据 pending item 当前状态构建 records 表 payload。""" meta = item["meta"] name = item["name"] fetch_start_ymd = item["fetch_start_ymd"] fetch_end_ymd = item["fetch_end_ymd"] result = item["result"] retain_reason = item.get("retain_reason") return { "name": name, "meta_id": meta.get("id"), "analyze_ymd": analyze_ymd, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "data_start_ymd": result.get("data_start_ymd"), "data_end_ymd": result.get("data_end_ymd"), "data_days": result.get("data_days"), "is_sustained_high": result.get("is_sustained_high"), "is_rising": result.get("is_rising"), "is_spike": result.get("is_spike"), "retain_reason": retain_reason, "is_internal_demand_matched": item.get("is_internal_demand_matched"), "matched_demand": item.get("matched_demand"), "demand_cache_run_id": demand_cache_run_id, "internal_demand_match_json": item.get("internal_demand_match_json"), "senior_fit_score": item.get("senior_fit_score"), "demand_senior_fit_json": item.get("demand_senior_fit_json"), "is_final_retained": item.get("is_final_retained"), "min_score": result.get("min_score"), "max_score": result.get("max_score"), "avg_score": result.get("avg_score"), "detail_json": { "phase": phase, "patterns": list(result.get("patterns") or []), "retain_reason": retain_reason, "retain_reason_priority": "2->3->1", "senior_fit_threshold": senior_threshold, "senior_fit_threshold_score": senior_threshold * 10.0, "is_final_retained": item.get("is_final_retained"), "sustained_threshold": sustained_threshold, "spike_days": spike_days, "spike_ratio": spike_ratio, "spike_baseline_floor": spike_baseline_floor, "rising_overall_change_rate": rising_overall_change_rate, "rising_window_change_rate": rising_window_change_rate, "rising_adjacent_up_ratio": rising_adjacent_up_ratio, }, } def _persist_wxindex_word_record( repository: HotContentRepository, payload: dict[str, Any], *, dry_run: bool, skip_db_save: bool, verbose: bool, action: str, ) -> int: name = str(payload.get("name") or "").strip() if dry_run or skip_db_save: if verbose: label = "dry-run" if dry_run else "skip-db-save" print(f"[{label}] would {action} wxindex word record word={name}") return 0 return repository.save_wxindex_word_record(payload) def _persist_pending_item_record( repository: HotContentRepository, item: dict[str, Any], *, analyze_ymd: str, demand_cache_run_id: int | None, phase: str, senior_threshold: float, sustained_threshold: float, spike_days: int, spike_ratio: float, spike_baseline_floor: float, rising_overall_change_rate: float, rising_window_change_rate: float, rising_adjacent_up_ratio: float, dry_run: bool, skip_db_save: bool, verbose: bool, ) -> int: payload = build_wxindex_word_record_payload( item, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, phase=phase, senior_threshold=senior_threshold, sustained_threshold=sustained_threshold, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, ) record_id = _persist_wxindex_word_record( repository, payload, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, action=f"{phase} word={item['name']}", ) if record_id: item["record_id"] = record_id return int(item.get("record_id") or 0) def _filter_candidates_awaiting_yesterday_score( repository: HotContentRepository, candidate_items: list[dict[str, Any]], *, yesterday_ymd: str, existing_records: dict[str, dict[str, Any]], verbose: bool, ) -> tuple[list[dict[str, Any]], int]: """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。""" names_to_check = [ item["name"] for item in candidate_items if not _is_heat_analysis_done(existing_records.get(item["name"])) ] if not names_to_check: return candidate_items, 0 names_with_yesterday = repository.list_wxindex_word_names_with_dt( names_to_check, dt=yesterday_ymd, ) ready_items: list[dict[str, Any]] = [] awaiting_count = 0 for item in candidate_items: name = item["name"] if _is_heat_analysis_done(existing_records.get(name)): ready_items.append(item) continue if name in names_with_yesterday: ready_items.append(item) continue awaiting_count += 1 if verbose: print( f"await yesterday score word={name} dt={yesterday_ymd}, skip this run" ) return ready_items, awaiting_count def _init_candidate_wxindex_word_records( repository: HotContentRepository, candidate_items: list[dict[str, Any]], *, analyze_ymd: str, demand_cache_run_id: int | None, dry_run: bool, skip_db_save: bool, verbose: bool, ) -> None: if not candidate_items: return if dry_run or skip_db_save: if verbose: label = "dry-run" if dry_run else "skip-db-save" print( f"[{label}] would batch init wxindex word records " f"count={len(candidate_items)} analyze_ymd={analyze_ymd}" ) return init_payloads = [ build_wxindex_word_record_init_payload( meta=item["meta"], name=item["name"], analyze_ymd=analyze_ymd, fetch_start_ymd=item["fetch_start_ymd"], fetch_end_ymd=item["fetch_end_ymd"], demand_cache_run_id=demand_cache_run_id, ) for item in candidate_items ] record_id_map = repository.init_wxindex_word_records(init_payloads) for item in candidate_items: item["record_id"] = int(record_id_map.get(item["name"]) or 0) if verbose: print( f"init wxindex word records count={len(candidate_items)} " f"analyze_ymd={analyze_ymd}" ) def _apply_demand_match_to_item( item: dict[str, Any], match_result: dict[str, Any], ) -> None: item["internal_demand_match_json"] = match_result item["is_internal_demand_matched"] = bool(match_result.get("matched")) matched_demand = str(match_result.get("matched_demand") or "").strip() item["matched_demand"] = matched_demand or None if not item.get("is_internal_demand_matched"): item["is_final_retained"] = False def _apply_senior_fit_to_item( item: dict[str, Any], senior_result: dict[str, Any], ) -> None: passed = bool(senior_result.get("passed")) item["senior_fit_score"] = senior_result.get("senior_fit_score") item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json") item["senior_fit_passed"] = passed item["is_final_retained"] = passed def build_wxindex_word_stats_payload( item: dict[str, Any], *, analyze_ymd: str, ) -> dict[str, Any]: """构建通过热度+老年性筛选的词统计 payload。""" meta = item["meta"] result = item["result"] return { "name": item["name"], "meta_id": meta.get("id"), "analyze_ymd": analyze_ymd, "wxindex_word_record_id": item.get("record_id"), "retain_reason": item.get("retain_reason"), "senior_fit_score": item.get("senior_fit_score"), "data_start_ymd": result.get("data_start_ymd"), "data_end_ymd": result.get("data_end_ymd"), "data_days": result.get("data_days"), "min_score": result.get("min_score"), "max_score": result.get("max_score"), "avg_score": result.get("avg_score"), "detail_json": { "demand_senior_fit_json": item.get("demand_senior_fit_json"), "is_sustained_high": result.get("is_sustained_high"), "is_rising": result.get("is_rising"), "is_spike": result.get("is_spike"), }, } def _save_senior_fit_passed_stats( repository: HotContentRepository, items: list[dict[str, Any]], *, analyze_ymd: str, existing_stats_names: set[str] | None = None, dry_run: bool, skip_db_save: bool, verbose: bool, ) -> tuple[int, int]: existing = existing_stats_names or set() all_payloads = [ build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd) for item in items if item.get("senior_fit_passed") ] resumed = sum(1 for payload in all_payloads if payload["name"] in existing) payloads = [payload for payload in all_payloads if payload["name"] not in existing] if not payloads: return 0, resumed if dry_run or skip_db_save: if verbose: label = "dry-run" if dry_run else "skip-db-save" print(f"[{label}] would save wxindex word stats count={len(payloads)}") return len(payloads), resumed saved = repository.save_wxindex_word_stats_batch(payloads) if verbose: print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}") return saved, resumed def _build_export_row_from_item( item: dict[str, Any], *, analyze_ymd: str, strategy: str, ) -> dict[str, Any]: meta = item["meta"] name = item["name"] result = item["result"] retain_reason = item.get("retain_reason") is_internal_demand_matched = item.get("is_internal_demand_matched") matched_demand = str(item.get("matched_demand") or "") senior_fit_score = item.get("senior_fit_score") is_final_retained = bool(item.get("is_final_retained")) return { "analyze_ymd": analyze_ymd, "name": name, "meta_id": meta.get("id"), "fetch_start_ymd": item["fetch_start_ymd"], "fetch_end_ymd": item["fetch_end_ymd"], "data_start_ymd": result.get("data_start_ymd"), "data_end_ymd": result.get("data_end_ymd"), "data_days": result.get("data_days"), "analysis_skipped": False, "skip_reason": "", "is_sustained_high": bool(result.get("is_sustained_high")), "is_rising": bool(result.get("is_rising")), "is_spike": bool(result.get("is_spike")), "retain_reason": retain_reason or "", "is_internal_demand_matched": ( "" if is_internal_demand_matched is None else is_internal_demand_matched ), "matched_demand": matched_demand, "senior_fit_score": senior_fit_score if senior_fit_score is not None else "", "is_final_retained": is_final_retained, "min_score": result.get("min_score"), "max_score": result.get("max_score"), "avg_score": result.get("avg_score"), "demand_id": ( build_demand_id( strategy=strategy, demand_name=name, partition_dt=analyze_ymd, ) if is_final_retained and strategy else "" ), "weight": ( float(result.get("max_score") or 0) / WEIGHT_DIVISOR if is_final_retained and result.get("max_score") is not None else "" ), } def run_wxindex_heat_pattern_daily_job( repository: HotContentRepository, *, config: FlowConfig | None = None, api_client: JsonApiClient | None = None, today: date | None = None, sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD, min_days: int = WXINDEX_HEAT_MIN_DAYS, max_days: int = WXINDEX_HEAT_MAX_DAYS, spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS, spike_ratio: float = WXINDEX_SPIKE_RATIO, spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR, rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE, rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE, rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO, dry_run: bool = False, skip_odps: bool = False, skip_db_save: bool = False, verbose: bool = False, ) -> dict[str, Any]: """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。""" current = today or datetime.now(SHANGHAI_TZ).date() analyze_ymd = current.strftime("%Y%m%d") meta_rows = repository.list_active_wxindex_word_meta(today=current) demand_name_set: list[str] = [] demand_cache_run_id: int | None = None demand_cache_error: str | None = None postprocess_service: ContributionPostprocessService | None = None if config is not None: try: cache = DemandCacheService(config, repository).get_or_create_current_hour_cache() demand_name_set = list(cache.get("demand_name_set") or []) demand_cache_run_id = int(cache["id"]) if demand_name_set: postprocess_service = ContributionPostprocessService( config, repository, api_client or JsonApiClient( timeout_seconds=config.request_timeout_seconds, verify_ssl=config.https_verify_ssl, ), ) except HotContentFlowError as exc: demand_cache_error = str(exc) if verbose: print(f"demand cache unavailable, skip demand match: {exc}") summary: dict[str, Any] = { "analyze_ymd": analyze_ymd, "meta_count": len(meta_rows), "analyzed": 0, "skipped": 0, "retained": 0, "sustained_high": 0, "rising": 0, "spike": 0, "internal_demand_matched": 0, "senior_scored": 0, "senior_fit_candidates": 0, "senior_fit_passed": 0, "stats_saved": 0, "final_retained": 0, "odps_synced": 0, "odps_written": 0, "demand_cache_run_id": demand_cache_run_id, "demand_cache_error": demand_cache_error, "demand_name_count": len(demand_name_set), "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE, "demand_match_batches": 0, "senior_fit_batches": 0, "records_initialized": 0, "awaiting_yesterday_score": 0, "heat_resumed": 0, "demand_match_resumed": 0, "senior_fit_resumed": 0, "stats_resumed": 0, "finalized_resumed": 0, "dry_run": dry_run, "skip_odps": skip_odps, "skip_db_save": skip_db_save, } retained_words: list[dict[str, Any]] = [] export_rows: list[dict[str, Any]] = [] final_hive_rows: list[dict[str, Any]] = [] senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD strategy = str(config.hot_demand_pool_strategy or "").strip() if config else "" pending_items: list[dict[str, Any]] = [] candidate_items: list[dict[str, Any]] = [] for meta in meta_rows: name = str(meta.get("name") or "").strip() fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip() fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip() if not name or not fetch_start_ymd or not fetch_end_ymd: summary["skipped"] += 1 continue candidate_items.append( { "meta": meta, "name": name, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, } ) _init_candidate_wxindex_word_records( repository, candidate_items, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) summary["records_initialized"] = len(candidate_items) existing_records: dict[str, dict[str, Any]] = {} existing_stats_names: set[str] = set() if not dry_run and not skip_db_save: candidate_names = [item["name"] for item in candidate_items] existing_records = repository.list_wxindex_word_records_by_analyze_ymd( analyze_ymd=analyze_ymd, names=candidate_names, ) existing_stats_names = repository.list_wxindex_word_stats_names( analyze_ymd=analyze_ymd, names=candidate_names, ) yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d") candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score( repository, candidate_items, yesterday_ymd=yesterday_ymd, existing_records=existing_records, verbose=verbose, ) summary["awaiting_yesterday_score"] = awaiting_yesterday for item in candidate_items: if not item.get("record_id"): existing = existing_records.get(item["name"]) if existing: item["record_id"] = int(existing.get("id") or 0) for item in candidate_items: meta = item["meta"] name = item["name"] fetch_start_ymd = item["fetch_start_ymd"] fetch_end_ymd = item["fetch_end_ymd"] existing_record = existing_records.get(name) scores = repository.list_wxindex_word_scores_in_range( name, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ) if ( _is_heat_analysis_done(existing_record) and not _should_rerun_heat_analysis( existing_record, scores=scores, fetch_start_ymd=fetch_start_ymd, fetch_end_ymd=fetch_end_ymd, min_days=min_days, max_days=max_days, ) ): summary["heat_resumed"] += 1 if _is_analysis_skipped_record(existing_record): if verbose: detail = _parse_record_detail_json(existing_record) print( f"resume skip word={name} reason={detail.get('skip_reason')} " f"days={existing_record.get('data_days')}" ) export_rows.append( _build_skipped_export_row_from_record( record=existing_record, meta=meta, name=name, analyze_ymd=analyze_ymd, fetch_start_ymd=fetch_start_ymd, fetch_end_ymd=fetch_end_ymd, ) ) continue pending_item = _rehydrate_pending_item_from_record( item, existing_record, senior_threshold=senior_threshold, ) pending_items.append(pending_item) if verbose: print( f"resume heat analyzed word={name} " f"retain_reason={pending_item.get('retain_reason') or ''} " f"data_days={pending_item['result'].get('data_days')}" ) continue result = analyze_wxindex_heat_patterns( scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, sustained_threshold=sustained_threshold, min_days=min_days, max_days=max_days, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, ) if result.get("skipped"): if verbose: print( f"skip word={name} reason={result.get('skip_reason')} " f"days={result.get('data_days')}" ) skipped_payload = build_wxindex_word_record_skipped_payload( meta=meta, name=name, analyze_ymd=analyze_ymd, fetch_start_ymd=fetch_start_ymd, fetch_end_ymd=fetch_end_ymd, result=result, demand_cache_run_id=demand_cache_run_id, ) _persist_wxindex_word_record( repository, skipped_payload, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, action="update skipped", ) export_rows.append( { "analyze_ymd": analyze_ymd, "name": name, "meta_id": meta.get("id"), "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "analysis_skipped": True, "skip_reason": result.get("skip_reason"), "data_days": result.get("data_days"), "retain_reason": "", "is_sustained_high": False, "is_rising": False, "is_spike": False, "is_internal_demand_matched": "", "matched_demand": "", "senior_fit_score": "", "is_final_retained": False, "min_score": "", "max_score": "", "avg_score": "", } ) continue retain_reason = str(result.get("retain_reason") or "").strip() or None pending_item = { "meta": meta, "name": name, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, "result": result, "retain_reason": retain_reason, "record_id": item.get("record_id", 0), "is_internal_demand_matched": None, "matched_demand": None, "internal_demand_match_json": None, "senior_fit_score": None, "demand_senior_fit_json": None, "senior_fit_passed": None, "is_final_retained": None, } pending_items.append(pending_item) _persist_pending_item_record( repository, pending_item, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, phase="heat_analyzed", senior_threshold=senior_threshold, sustained_threshold=sustained_threshold, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) if verbose: print( f"heat analyzed word={name} retain_reason={retain_reason or ''} " f"data_days={result.get('data_days')}" ) demand_match_results: dict[str, dict[str, Any]] = {} pending_by_name = {item["name"]: item for item in pending_items} sustained_high_words = [ item["name"] for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH and item.get("is_internal_demand_matched") is None ] for item in pending_items: if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH: continue if item.get("is_internal_demand_matched") is not None: summary["demand_match_resumed"] += 1 def _persist_demand_match_updates(words: list[str]) -> None: for word in words: pending_item = pending_by_name.get(word) if pending_item is None: continue match_result = demand_match_results.get(word) or { "word": word, "matched": False, "matched_demand": "", "match_list": [], } _apply_demand_match_to_item(pending_item, match_result) _persist_pending_item_record( repository, pending_item, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, phase="demand_matched", senior_threshold=senior_threshold, sustained_threshold=sustained_threshold, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) if sustained_high_words: if postprocess_service is not None and demand_name_set: for batch in _chunk_words(sustained_high_words): summary["demand_match_batches"] += 1 if verbose: print(f"demand match batch size={len(batch)} words={batch}") demand_match_results.update( postprocess_service.match_words_to_demand_pool( words=batch, demand_name_set=demand_name_set, ) ) _persist_demand_match_updates(batch) else: for word in sustained_high_words: demand_match_results[word] = { "word": word, "matched": False, "matched_demand": "", "match_list": [], "skip_reason": "empty_demand_cache", } _persist_demand_match_updates(sustained_high_words) senior_fit_candidate_names: list[str] = [] for item in pending_items: retain_reason = item.get("retain_reason") name = str(item.get("name") or "").strip() if not retain_reason or not name: continue if _is_senior_fit_attempt_done(item, existing_records.get(name)): summary["senior_fit_resumed"] += 1 continue if retain_reason == RETAIN_REASON_SUSTAINED_HIGH: if not item.get("is_internal_demand_matched"): continue senior_fit_candidate_names.append(name) senior_fit_results: dict[str, dict[str, Any]] = {} def _persist_senior_fit_updates(words: list[str]) -> None: for word in words: pending_item = pending_by_name.get(word) senior_result = senior_fit_results.get(word) if pending_item is None or senior_result is None: continue _apply_senior_fit_to_item(pending_item, senior_result) _persist_pending_item_record( repository, pending_item, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, phase="senior_fit_scored", senior_threshold=senior_threshold, sustained_threshold=sustained_threshold, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) if verbose and not pending_item.get("senior_fit_passed"): print( f"senior fit rejected word={word} " f"score={pending_item.get('senior_fit_score')}" ) if config is not None and senior_fit_candidate_names: for batch in _chunk_words(senior_fit_candidate_names): summary["senior_fit_batches"] += 1 if verbose: print(f"senior fit batch size={len(batch)} words={batch}") senior_fit_results.update( score_wxindex_words_senior_fit( words=batch, config=config, senior_threshold=senior_threshold, batch_size=WXINDEX_WORD_LLM_BATCH_SIZE, ) ) _persist_senior_fit_updates(batch) stats_saved, stats_resumed = _save_senior_fit_passed_stats( repository, pending_items, analyze_ymd=analyze_ymd, existing_stats_names=existing_stats_names, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) summary["stats_saved"] = stats_saved summary["stats_resumed"] = stats_resumed for item in pending_items: name = item["name"] result = item["result"] retain_reason = item.get("retain_reason") if retain_reason: retained_words.append( { "name": name, "retain_reason": retain_reason, "data_days": result.get("data_days"), "data_start_ymd": result.get("data_start_ymd"), "data_end_ymd": result.get("data_end_ymd"), "is_internal_demand_matched": item.get("is_internal_demand_matched"), "matched_demand": str(item.get("matched_demand") or ""), "senior_fit_score": item.get("senior_fit_score"), "is_final_retained": bool(item.get("is_final_retained")), } ) existing_record = existing_records.get(name) if _is_finalized_record(existing_record): summary["finalized_resumed"] += 1 record_id = int(item.get("record_id") or existing_record.get("id") or 0) if verbose: print(f"resume finalized word={name}") else: record_id = _persist_pending_item_record( repository, item, analyze_ymd=analyze_ymd, demand_cache_run_id=demand_cache_run_id, phase="finalized", senior_threshold=senior_threshold, sustained_threshold=sustained_threshold, spike_days=spike_days, spike_ratio=spike_ratio, spike_baseline_floor=spike_baseline_floor, rising_overall_change_rate=rising_overall_change_rate, rising_window_change_rate=rising_window_change_rate, rising_adjacent_up_ratio=rising_adjacent_up_ratio, dry_run=dry_run, skip_db_save=skip_db_save, verbose=verbose, ) export_rows.append( _build_export_row_from_item( item, analyze_ymd=analyze_ymd, strategy=strategy, ) ) if item.get("is_final_retained") and strategy: final_hive_rows.append( build_wxindex_word_hive_row( wxindex_word_record_id=record_id, word=name, strategy=strategy, partition_dt=analyze_ymd, max_score=result.get("max_score"), retain_reason=item.get("retain_reason"), ) ) if ( final_hive_rows and strategy and config is not None and not dry_run and not skip_odps ): odps_summary = sync_wxindex_word_rows_to_odps( config, repository, hive_rows=final_hive_rows, partition_dt=analyze_ymd, strategy=strategy, ) summary["odps_written"] = odps_summary.get("written_count", 0) summary["odps_synced"] = odps_summary.get("odps_synced", 0) summary["odps_sync"] = odps_summary elif dry_run or skip_odps: summary["odps_written"] = len(final_hive_rows) summary["odps_synced"] = len(final_hive_rows) _refresh_wxindex_heat_job_summary( summary, pending_items=pending_items, export_rows=export_rows, ) summary["retained_words"] = retained_words summary["export_rows"] = export_rows return summary WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [ "analyze_ymd", "name", "meta_id", "fetch_start_ymd", "fetch_end_ymd", "data_start_ymd", "data_end_ymd", "data_days", "analysis_skipped", "skip_reason", "is_sustained_high", "is_rising", "is_spike", "retain_reason", "is_internal_demand_matched", "matched_demand", "senior_fit_score", "is_final_retained", "min_score", "max_score", "avg_score", "demand_id", "weight", ] def write_wxindex_heat_pattern_csv( rows: list[dict[str, Any]], output_path: str | Path, *, fieldnames: list[str] | None = None, ) -> Path: """将热度分析明细写入本地 CSV。""" path = Path(output_path).expanduser() if not path.is_absolute(): path = Path.cwd() / path path.parent.mkdir(parents=True, exist_ok=True) columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS with path.open("w", encoding="utf-8-sig", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore") writer.writeheader() for row in rows: writer.writerow({column: row.get(column, "") for column in columns}) return path