| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- """微信指数趋势计算工具。"""
- from __future__ import annotations
- import math
- from typing import Any
- MIN_TREND_POINTS = 4
- MAX_TREND_POINTS = 7
- UP_FIT_CHANGE_RATE = 0.04
- UP_WINDOW_CHANGE_RATE = 0.02
- DOWN_FIT_CHANGE_RATE = -0.04
- DOWN_WINDOW_CHANGE_RATE = -0.02
- HEAT_RISING_OVERALL_CHANGE_RATE = 0.15
- HEAT_RISING_WINDOW_CHANGE_RATE = 0.12
- HEAT_RISING_ADJACENT_UP_RATIO = 0.65
- HEAT_RISING_RECENT_DROP_RATE = -0.15
- HEAT_SPIKE_BASELINE_FLOOR = 50_000.0
- HEAT_SPIKE_RATIO = 2.5
- def _median(values: list[float]) -> float:
- if not values:
- return 0.0
- ordered = sorted(values)
- mid = len(ordered) // 2
- if len(ordered) % 2:
- return ordered[mid]
- return (ordered[mid - 1] + ordered[mid]) / 2
- def _extract_recent_scores(
- series: list[dict[str, Any]],
- *,
- max_points: int = MAX_TREND_POINTS,
- ) -> list[float]:
- scored_rows: list[tuple[str, int, float]] = []
- for index, row in enumerate(series):
- if not isinstance(row, dict):
- continue
- try:
- score = float(row.get("total_score"))
- except (TypeError, ValueError):
- continue
- if math.isnan(score) or score < 0:
- continue
- ymd = str(row.get("ymd") or "").strip()
- scored_rows.append((ymd, index, score))
- scored_rows.sort(key=lambda item: (item[0], item[1]))
- point_limit = max(max_points, 1)
- return [score for _, _, score in scored_rows[-point_limit:]]
- def _theil_sen_slope(values: list[float]) -> float:
- slopes: list[float] = []
- for start_index, start_value in enumerate(values):
- for end_index in range(start_index + 1, len(values)):
- slopes.append((values[end_index] - start_value) / (end_index - start_index))
- return _median(slopes)
- def extract_sorted_scores(
- series: list[dict[str, Any]],
- *,
- max_points: int | None = None,
- ) -> list[float]:
- point_limit = (
- max_points if max_points is not None and max_points > 0 else MAX_TREND_POINTS
- )
- return _extract_recent_scores(series, max_points=point_limit)
- def is_wxindex_rising_scores(
- scores: list[float],
- *,
- min_points: int = MIN_TREND_POINTS,
- ) -> bool:
- """判断热度序列是否呈持续上涨趋势(原流程 Theil-Sen 规则)。"""
- if len(scores) < min_points:
- return False
- log_scores = [math.log1p(score) for score in scores]
- slope = _theil_sen_slope(log_scores)
- fit_change_rate = math.expm1(slope * (len(log_scores) - 1))
- early_count = min(3, len(scores))
- late_count = min(3, len(scores))
- early_avg = sum(scores[:early_count]) / early_count
- late_avg = sum(scores[-late_count:]) / late_count
- window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
- return (
- fit_change_rate >= UP_FIT_CHANGE_RATE
- and window_change_rate >= UP_WINDOW_CHANGE_RATE
- )
- def _has_significant_recent_drop(
- scores: list[float],
- *,
- drop_rate_threshold: float = HEAT_RISING_RECENT_DROP_RATE,
- ) -> bool:
- """最近一天热度相较昨天或前天是否出现大幅度下降。"""
- if len(scores) < 2:
- return False
- latest = scores[-1]
- prior_scores = [scores[-2]]
- if len(scores) >= 3:
- prior_scores.append(scores[-3])
- for prior in prior_scores:
- change_rate = (latest - prior) / max(prior, 1.0)
- if change_rate <= drop_rate_threshold:
- return True
- return False
- def is_wxindex_heat_rising_scores(
- scores: list[float],
- *,
- min_points: int = 7,
- overall_change_rate: float = HEAT_RISING_OVERALL_CHANGE_RATE,
- window_change_rate_threshold: float = HEAT_RISING_WINDOW_CHANGE_RATE,
- adjacent_up_ratio: float = HEAT_RISING_ADJACENT_UP_RATIO,
- recent_drop_rate_threshold: float = HEAT_RISING_RECENT_DROP_RATE,
- ) -> bool:
- """热度模式任务:判断区间内是否持续上涨。"""
- if len(scores) < min_points:
- return False
- first_score = scores[0]
- last_score = scores[-1]
- overall_change_rate_value = (last_score - first_score) / max(first_score, 1.0)
- if overall_change_rate_value < overall_change_rate:
- return False
- early_avg = sum(scores[:3]) / 3
- late_avg = sum(scores[-3:]) / 3
- window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
- if window_change_rate < window_change_rate_threshold:
- return False
- if last_score <= first_score:
- return False
- if _has_significant_recent_drop(
- scores,
- drop_rate_threshold=recent_drop_rate_threshold,
- ):
- return False
- adjacent_total = len(scores) - 1
- if adjacent_total <= 0:
- return False
- adjacent_up_count = sum(
- 1 for index in range(adjacent_total) if scores[index + 1] > scores[index]
- )
- return adjacent_up_count / adjacent_total >= adjacent_up_ratio
- def is_wxindex_spike_scores(
- scores: list[float],
- *,
- spike_days: int = 3,
- min_points: int = 7,
- spike_ratio: float = HEAT_SPIKE_RATIO,
- baseline_floor: float = HEAT_SPIKE_BASELINE_FLOOR,
- ) -> bool:
- """判断最近 N 天热度是否相对前期突然暴涨。"""
- if len(scores) < min_points or spike_days <= 0 or len(scores) <= spike_days:
- return False
- baseline = scores[:-spike_days]
- recent = scores[-spike_days:]
- baseline_avg = sum(baseline) / len(baseline)
- recent_avg = sum(recent) / len(recent)
- effective_baseline = max(baseline_avg, baseline_floor)
- if recent_avg / effective_baseline < spike_ratio:
- return False
- baseline_max = max(baseline)
- return recent[-1] > baseline_max and recent_avg > baseline_avg
- def calc_wxindex_trend(series: list[dict[str, Any]]) -> str:
- """按最近 7 天整体走势计算趋势,避免被最后一天波动误导。"""
- scores = _extract_recent_scores(series)
- if len(scores) < MIN_TREND_POINTS:
- return "未知"
- if is_wxindex_rising_scores(scores, min_points=MIN_TREND_POINTS):
- return "上升"
- log_scores = [math.log1p(score) for score in scores]
- slope = _theil_sen_slope(log_scores)
- fit_change_rate = math.expm1(slope * (len(log_scores) - 1))
- early_avg = sum(scores[:3]) / 3
- late_avg = sum(scores[-3:]) / 3
- window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
- if (
- fit_change_rate <= DOWN_FIT_CHANGE_RATE
- and window_change_rate <= DOWN_WINDOW_CHANGE_RATE
- ):
- return "下降"
- return "平稳"
|