"""新热事件需求词来源:按 sync_log 定位热点标题并返回单条展示数据。""" from __future__ import annotations import hashlib import json import math from datetime import datetime from typing import Any from sqlalchemy import text from app.core.config import settings from app.db.hot_content_mysql import HotContentSessionLocal TYPE_FEATURE_POINT = "特征点" TYPE_PHRASE = "短语" ITEM_TYPE_ELEMENT = "元素" ITEM_TYPE_PHRASE = "短语" TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"}) WEIGHT_DIVISOR = 1_000_000.0 DEFAULT_WXINDEX_THRESHOLD = 1_000_000.0 DEFAULT_HOT_STRATEGY = "新热事件" def _hot_strategy_name() -> str: return str( getattr(settings, "hot_demand_pool_strategy", None) or DEFAULT_HOT_STRATEGY ).strip() or DEFAULT_HOT_STRATEGY def _wxindex_threshold() -> float: raw = getattr(settings, "hot_content_wxindex_threshold", None) if raw is None: return DEFAULT_WXINDEX_THRESHOLD try: return float(raw) except (TypeError, ValueError): return DEFAULT_WXINDEX_THRESHOLD def _normalize_date(date_value: str | None) -> str: if not date_value: return "" normalized = str(date_value).replace("-", "").strip() return normalized if len(normalized) == 8 and normalized.isdigit() else "" def _normalize_text(value: str) -> str: return "".join(str(value or "").split()) def _load_json(value: Any) -> dict[str, Any]: if value is None: return {} if isinstance(value, dict): return value if isinstance(value, (bytes, bytearray)): value = value.decode("utf-8") if isinstance(value, str): text_value = value.strip() if not text_value: return {} try: parsed = json.loads(text_value) except json.JSONDecodeError: return {"__parse_error__": text_value} return parsed if isinstance(parsed, dict) else {"value": parsed} return {"value": value} def _fmt_datetime(value: Any) -> str: if value is None: return "-" if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") return str(value) def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str: raw = f"{strategy}{demand_name.strip()}{partition_dt}" return hashlib.md5(raw.encode("utf-8")).hexdigest() def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float: scores: list[float] = [] for row in export_rows: try: scores.append(float(row.get("wxindex_latest_score") or 0)) except (TypeError, ValueError): continue return max(scores) if scores else 0.0 def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool: point_category = str(row.get("point_category") or "").strip() if point_category not in TITLE_RETAIN_POINT_CATEGORIES: return False return bool(str(row.get("matched_demand") or "").strip()) def _export_row_contributes_to_demand( row: dict[str, Any], *, demand_name: str, demand_type: str, ) -> bool: normalized_name = _normalize_text(demand_name) item_type = str(row.get("item_type") or "") item_text = str(row.get("item_text") or "") matched = str(row.get("matched_demand") or "").strip() if not matched: return False if demand_type == TYPE_PHRASE: return item_type == ITEM_TYPE_PHRASE and _normalize_text(item_text) == normalized_name if demand_type == TYPE_FEATURE_POINT: return item_type == ITEM_TYPE_ELEMENT and _normalize_text(item_text) in normalized_name return False def _hive_weight_for_record( export_rows: list[dict[str, Any]], *, wxindex_threshold: float, ) -> float | None: if _record_wxindex_score(export_rows) < wxindex_threshold: return None if not any(_has_inspiration_or_goal_demand_match(row) for row in export_rows): return None return _record_wxindex_score(export_rows) / WEIGHT_DIVISOR def fetch_sync_log_row( *, demand_name: str, demand_type: str, partition_dt: str, strategy: str | None = None, ) -> dict[str, Any] | None: """按需求名称、需求类型、分区 dt 查询 hot_content_odps_sync_log 唯一记录。""" name = (demand_name or "").strip() dtype = (demand_type or "").strip() dt = _normalize_date(partition_dt) if not name or not dtype or not dt: raise ValueError("demand_name、demand_type、dt 均为必填") strategy_value = (strategy or _hot_strategy_name()).strip() sql = text( """ SELECT id, partition_dt, strategy, demand_id, demand_name, demand_type, record_id, synced_at FROM hot_content_odps_sync_log WHERE demand_name = :demand_name AND demand_type = :demand_type AND partition_dt = :partition_dt AND strategy = :strategy LIMIT 2 """ ) params = { "demand_name": name, "demand_type": dtype, "partition_dt": dt, "strategy": strategy_value, } with HotContentSessionLocal() as session: rows = session.execute(sql, params).mappings().all() if not rows: return None if len(rows) > 1: raise ValueError("匹配到多条同步记录,请检查查询条件") return dict(rows[0]) def fetch_hot_content_source_detail( *, demand_name: str, demand_type: str, partition_dt: str, strategy: str | None = None, ) -> dict[str, Any]: sync_row = fetch_sync_log_row( demand_name=demand_name, demand_type=demand_type, partition_dt=partition_dt, strategy=strategy, ) if sync_row is None: raise LookupError("未找到对应的同步记录") record_id = int(sync_row.get("record_id") or 0) if record_id <= 0: raise LookupError("同步记录未关联热点内容") wxindex_threshold = _wxindex_threshold() strategy_value = str(sync_row.get("strategy") or _hot_strategy_name()) partition_dt = str(sync_row.get("partition_dt") or "") sync_demand_name = str(sync_row.get("demand_name") or "") sync_demand_type = str(sync_row.get("demand_type") or "") demand_id = str(sync_row.get("demand_id") or "") record_sql = text( """ SELECT id, source, title, article_title, article_body, hot_rank, execution_status, postprocess_status, created_at, contribution_demand_match_json, wxindex_trend_json FROM hot_content_records WHERE id = :record_id LIMIT 1 """ ) export_sql = text( """ SELECT id, item_type, item_text, point_category, matched_demand, contribution_score, wxindex_keyword, all_hot_keywords, wxindex_latest_score, wxindex_trend FROM hot_content_demand_exports WHERE record_id = :record_id ORDER BY id ASC """ ) with HotContentSessionLocal() as session: record_row = session.execute(record_sql, {"record_id": record_id}).mappings().first() export_rows_raw = session.execute(export_sql, {"record_id": record_id}).mappings().all() if record_row is None: raise LookupError("未找到关联热点内容") export_dicts: list[dict[str, Any]] = [] export_items: list[dict[str, Any]] = [] for row in export_rows_raw: item = dict(row) contributes = _export_row_contributes_to_demand( item, demand_name=sync_demand_name, demand_type=sync_demand_type, ) export_dicts.append( { "item_type": item.get("item_type"), "item_text": item.get("item_text"), "point_category": item.get("point_category"), "matched_demand": item.get("matched_demand"), "wxindex_latest_score": item.get("wxindex_latest_score"), } ) export_items.append( { "id": int(item["id"]), "item_type": str(item.get("item_type") or ""), "item_text": str(item.get("item_text") or ""), "point_category": str(item.get("point_category") or ""), "matched_demand": str(item.get("matched_demand") or ""), "contribution_score": float(item["contribution_score"]) if item.get("contribution_score") is not None else None, "wxindex_keyword": str(item.get("wxindex_keyword") or ""), "all_hot_keywords": str(item.get("all_hot_keywords") or ""), "wxindex_latest_score": float(item.get("wxindex_latest_score") or 0), "wxindex_trend": str(item.get("wxindex_trend") or ""), "contributes_to_sync": contributes, } ) max_wxindex = _record_wxindex_score(export_dicts) hive_weight = _hive_weight_for_record( export_dicts, wxindex_threshold=wxindex_threshold, ) expected_id = build_demand_id( strategy=strategy_value, demand_name=sync_demand_name, partition_dt=partition_dt, ) contribution = _load_json(record_row.get("contribution_demand_match_json")) wxindex = _load_json(record_row.get("wxindex_trend_json")) return { "wxindex_threshold": wxindex_threshold, "sync_log": { "id": int(sync_row["id"]), "partition_dt": partition_dt, "strategy": strategy_value, "demand_id": demand_id, "demand_name": sync_demand_name, "demand_type": sync_demand_type, "record_id": record_id, "synced_at": _fmt_datetime(sync_row.get("synced_at")), "demand_id_verified": expected_id == demand_id, "hive_weight": hive_weight, }, "record": { "id": int(record_row["id"]), "source": str(record_row.get("source") or ""), "title": str(record_row.get("title") or ""), "article_title": str(record_row.get("article_title") or ""), "article_body": str(record_row.get("article_body") or ""), "hot_rank": int(record_row["hot_rank"]) if record_row.get("hot_rank") is not None else None, "created_at": _fmt_datetime(record_row.get("created_at")), "contribution": contribution, "wxindex": wxindex, "max_wxindex_score": max_wxindex, "passes_wxindex_gate": max_wxindex >= wxindex_threshold, "passes_point_gate": any( _has_inspiration_or_goal_demand_match(row) for row in export_dicts ), }, "export_rows": export_items, } def format_number(value: Any) -> str: try: number = float(value) except (TypeError, ValueError): return "-" if math.isnan(number): return "-" if abs(number) >= 10000: return f"{number / 10000:.1f}万" return f"{number:.0f}"