"""微信指数趋势计算工具。""" from __future__ import annotations import math from typing import Any MIN_TREND_POINTS = 4 MAX_TREND_POINTS = 7 UP_FIT_CHANGE_RATE = 0.04 UP_WINDOW_CHANGE_RATE = 0.02 DOWN_FIT_CHANGE_RATE = -0.04 DOWN_WINDOW_CHANGE_RATE = -0.02 HEAT_RISING_OVERALL_CHANGE_RATE = 0.15 HEAT_RISING_WINDOW_CHANGE_RATE = 0.12 HEAT_RISING_ADJACENT_UP_RATIO = 0.65 HEAT_RISING_RECENT_DROP_RATE = -0.15 HEAT_SPIKE_BASELINE_FLOOR = 50_000.0 HEAT_SPIKE_RATIO = 2.5 def _median(values: list[float]) -> float: if not values: return 0.0 ordered = sorted(values) mid = len(ordered) // 2 if len(ordered) % 2: return ordered[mid] return (ordered[mid - 1] + ordered[mid]) / 2 def _extract_recent_scores( series: list[dict[str, Any]], *, max_points: int = MAX_TREND_POINTS, ) -> list[float]: scored_rows: list[tuple[str, int, float]] = [] for index, row in enumerate(series): if not isinstance(row, dict): continue try: score = float(row.get("total_score")) except (TypeError, ValueError): continue if math.isnan(score) or score < 0: continue ymd = str(row.get("ymd") or "").strip() scored_rows.append((ymd, index, score)) scored_rows.sort(key=lambda item: (item[0], item[1])) point_limit = max(max_points, 1) return [score for _, _, score in scored_rows[-point_limit:]] def _theil_sen_slope(values: list[float]) -> float: slopes: list[float] = [] for start_index, start_value in enumerate(values): for end_index in range(start_index + 1, len(values)): slopes.append((values[end_index] - start_value) / (end_index - start_index)) return _median(slopes) def extract_sorted_scores( series: list[dict[str, Any]], *, max_points: int | None = None, ) -> list[float]: point_limit = ( max_points if max_points is not None and max_points > 0 else MAX_TREND_POINTS ) return _extract_recent_scores(series, max_points=point_limit) def is_wxindex_rising_scores( scores: list[float], *, min_points: int = MIN_TREND_POINTS, ) -> bool: """判断热度序列是否呈持续上涨趋势(原流程 Theil-Sen 规则)。""" if len(scores) < min_points: return False log_scores = [math.log1p(score) for score in scores] slope = _theil_sen_slope(log_scores) fit_change_rate = math.expm1(slope * (len(log_scores) - 1)) early_count = min(3, len(scores)) late_count = min(3, len(scores)) early_avg = sum(scores[:early_count]) / early_count late_avg = sum(scores[-late_count:]) / late_count window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0) return ( fit_change_rate >= UP_FIT_CHANGE_RATE and window_change_rate >= UP_WINDOW_CHANGE_RATE ) def _has_significant_recent_drop( scores: list[float], *, drop_rate_threshold: float = HEAT_RISING_RECENT_DROP_RATE, ) -> bool: """最近一天热度相较昨天或前天是否出现大幅度下降。""" if len(scores) < 2: return False latest = scores[-1] prior_scores = [scores[-2]] if len(scores) >= 3: prior_scores.append(scores[-3]) for prior in prior_scores: change_rate = (latest - prior) / max(prior, 1.0) if change_rate <= drop_rate_threshold: return True return False def is_wxindex_heat_rising_scores( scores: list[float], *, min_points: int = 7, overall_change_rate: float = HEAT_RISING_OVERALL_CHANGE_RATE, window_change_rate_threshold: float = HEAT_RISING_WINDOW_CHANGE_RATE, adjacent_up_ratio: float = HEAT_RISING_ADJACENT_UP_RATIO, recent_drop_rate_threshold: float = HEAT_RISING_RECENT_DROP_RATE, ) -> bool: """热度模式任务:判断区间内是否持续上涨。""" if len(scores) < min_points: return False first_score = scores[0] last_score = scores[-1] overall_change_rate_value = (last_score - first_score) / max(first_score, 1.0) if overall_change_rate_value < overall_change_rate: return False early_avg = sum(scores[:3]) / 3 late_avg = sum(scores[-3:]) / 3 window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0) if window_change_rate < window_change_rate_threshold: return False if last_score <= first_score: return False if _has_significant_recent_drop( scores, drop_rate_threshold=recent_drop_rate_threshold, ): return False adjacent_total = len(scores) - 1 if adjacent_total <= 0: return False adjacent_up_count = sum( 1 for index in range(adjacent_total) if scores[index + 1] > scores[index] ) return adjacent_up_count / adjacent_total >= adjacent_up_ratio def is_wxindex_spike_scores( scores: list[float], *, spike_days: int = 3, min_points: int = 7, spike_ratio: float = HEAT_SPIKE_RATIO, baseline_floor: float = HEAT_SPIKE_BASELINE_FLOOR, ) -> bool: """判断最近 N 天热度是否相对前期突然暴涨。""" if len(scores) < min_points or spike_days <= 0 or len(scores) <= spike_days: return False baseline = scores[:-spike_days] recent = scores[-spike_days:] baseline_avg = sum(baseline) / len(baseline) recent_avg = sum(recent) / len(recent) effective_baseline = max(baseline_avg, baseline_floor) if recent_avg / effective_baseline < spike_ratio: return False baseline_max = max(baseline) return recent[-1] > baseline_max and recent_avg > baseline_avg def calc_wxindex_trend(series: list[dict[str, Any]]) -> str: """按最近 7 天整体走势计算趋势,避免被最后一天波动误导。""" scores = _extract_recent_scores(series) if len(scores) < MIN_TREND_POINTS: return "未知" if is_wxindex_rising_scores(scores, min_points=MIN_TREND_POINTS): return "上升" log_scores = [math.log1p(score) for score in scores] slope = _theil_sen_slope(log_scores) fit_change_rate = math.expm1(slope * (len(log_scores) - 1)) early_avg = sum(scores[:3]) / 3 late_avg = sum(scores[-3:]) / 3 window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0) if ( fit_change_rate <= DOWN_FIT_CHANGE_RATE and window_change_rate <= DOWN_WINDOW_CHANGE_RATE ): return "下降" return "平稳"