| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- """新热事件需求词来源:按 sync_log 定位热点标题并返回单条展示数据。"""
- from __future__ import annotations
- import hashlib
- import json
- import math
- from datetime import datetime
- from typing import Any
- from sqlalchemy import text
- from app.core.config import settings
- from app.db.hot_content_mysql import HotContentSessionLocal
- TYPE_FEATURE_POINT = "特征点"
- TYPE_PHRASE = "短语"
- ITEM_TYPE_ELEMENT = "元素"
- ITEM_TYPE_PHRASE = "短语"
- TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
- WEIGHT_DIVISOR = 1_000_000.0
- DEFAULT_WXINDEX_THRESHOLD = 1_000_000.0
- DEFAULT_HOT_STRATEGY = "新热事件"
- def _hot_strategy_name() -> str:
- return str(
- getattr(settings, "hot_demand_pool_strategy", None) or DEFAULT_HOT_STRATEGY
- ).strip() or DEFAULT_HOT_STRATEGY
- def _wxindex_threshold() -> float:
- raw = getattr(settings, "hot_content_wxindex_threshold", None)
- if raw is None:
- return DEFAULT_WXINDEX_THRESHOLD
- try:
- return float(raw)
- except (TypeError, ValueError):
- return DEFAULT_WXINDEX_THRESHOLD
- def _normalize_date(date_value: str | None) -> str:
- if not date_value:
- return ""
- normalized = str(date_value).replace("-", "").strip()
- return normalized if len(normalized) == 8 and normalized.isdigit() else ""
- def _normalize_text(value: str) -> str:
- return "".join(str(value or "").split())
- def _load_json(value: Any) -> dict[str, Any]:
- if value is None:
- return {}
- if isinstance(value, dict):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- text_value = value.strip()
- if not text_value:
- return {}
- try:
- parsed = json.loads(text_value)
- except json.JSONDecodeError:
- return {"__parse_error__": text_value}
- return parsed if isinstance(parsed, dict) else {"value": parsed}
- return {"value": value}
- def _fmt_datetime(value: Any) -> str:
- if value is None:
- return "-"
- if isinstance(value, datetime):
- return value.strftime("%Y-%m-%d %H:%M:%S")
- return str(value)
- def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
- raw = f"{strategy}{demand_name.strip()}{partition_dt}"
- return hashlib.md5(raw.encode("utf-8")).hexdigest()
- def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
- scores: list[float] = []
- for row in export_rows:
- try:
- scores.append(float(row.get("wxindex_latest_score") or 0))
- except (TypeError, ValueError):
- continue
- return max(scores) if scores else 0.0
- def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
- point_category = str(row.get("point_category") or "").strip()
- if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
- return False
- return bool(str(row.get("matched_demand") or "").strip())
- def _export_row_contributes_to_demand(
- row: dict[str, Any],
- *,
- demand_name: str,
- demand_type: str,
- ) -> bool:
- normalized_name = _normalize_text(demand_name)
- item_type = str(row.get("item_type") or "")
- item_text = str(row.get("item_text") or "")
- matched = str(row.get("matched_demand") or "").strip()
- if not matched:
- return False
- if demand_type == TYPE_PHRASE:
- return item_type == ITEM_TYPE_PHRASE and _normalize_text(item_text) == normalized_name
- if demand_type == TYPE_FEATURE_POINT:
- return item_type == ITEM_TYPE_ELEMENT and _normalize_text(item_text) in normalized_name
- return False
- def _hive_weight_for_record(
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- ) -> float | None:
- if _record_wxindex_score(export_rows) < wxindex_threshold:
- return None
- if not any(_has_inspiration_or_goal_demand_match(row) for row in export_rows):
- return None
- return _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
- def fetch_sync_log_row(
- *,
- demand_name: str,
- demand_type: str,
- partition_dt: str,
- strategy: str | None = None,
- ) -> dict[str, Any] | None:
- """按需求名称、需求类型、分区 dt 查询 hot_content_odps_sync_log 唯一记录。"""
- name = (demand_name or "").strip()
- dtype = (demand_type or "").strip()
- dt = _normalize_date(partition_dt)
- if not name or not dtype or not dt:
- raise ValueError("demand_name、demand_type、dt 均为必填")
- strategy_value = (strategy or _hot_strategy_name()).strip()
- sql = text(
- """
- SELECT
- id,
- partition_dt,
- strategy,
- demand_id,
- demand_name,
- demand_type,
- record_id,
- synced_at
- FROM hot_content_odps_sync_log
- WHERE demand_name = :demand_name
- AND demand_type = :demand_type
- AND partition_dt = :partition_dt
- AND strategy = :strategy
- LIMIT 2
- """
- )
- params = {
- "demand_name": name,
- "demand_type": dtype,
- "partition_dt": dt,
- "strategy": strategy_value,
- }
- with HotContentSessionLocal() as session:
- rows = session.execute(sql, params).mappings().all()
- if not rows:
- return None
- if len(rows) > 1:
- raise ValueError("匹配到多条同步记录,请检查查询条件")
- return dict(rows[0])
- def fetch_hot_content_source_detail(
- *,
- demand_name: str,
- demand_type: str,
- partition_dt: str,
- strategy: str | None = None,
- ) -> dict[str, Any]:
- sync_row = fetch_sync_log_row(
- demand_name=demand_name,
- demand_type=demand_type,
- partition_dt=partition_dt,
- strategy=strategy,
- )
- if sync_row is None:
- raise LookupError("未找到对应的同步记录")
- record_id = int(sync_row.get("record_id") or 0)
- if record_id <= 0:
- raise LookupError("同步记录未关联热点内容")
- wxindex_threshold = _wxindex_threshold()
- strategy_value = str(sync_row.get("strategy") or _hot_strategy_name())
- partition_dt = str(sync_row.get("partition_dt") or "")
- sync_demand_name = str(sync_row.get("demand_name") or "")
- sync_demand_type = str(sync_row.get("demand_type") or "")
- demand_id = str(sync_row.get("demand_id") or "")
- record_sql = text(
- """
- SELECT
- id,
- source,
- title,
- article_title,
- article_body,
- hot_rank,
- execution_status,
- postprocess_status,
- created_at,
- contribution_demand_match_json,
- wxindex_trend_json
- FROM hot_content_records
- WHERE id = :record_id
- LIMIT 1
- """
- )
- export_sql = text(
- """
- SELECT
- id,
- item_type,
- item_text,
- point_category,
- matched_demand,
- contribution_score,
- wxindex_keyword,
- all_hot_keywords,
- wxindex_latest_score,
- wxindex_trend
- FROM hot_content_demand_exports
- WHERE record_id = :record_id
- ORDER BY id ASC
- """
- )
- with HotContentSessionLocal() as session:
- record_row = session.execute(record_sql, {"record_id": record_id}).mappings().first()
- export_rows_raw = session.execute(export_sql, {"record_id": record_id}).mappings().all()
- if record_row is None:
- raise LookupError("未找到关联热点内容")
- export_dicts: list[dict[str, Any]] = []
- export_items: list[dict[str, Any]] = []
- for row in export_rows_raw:
- item = dict(row)
- contributes = _export_row_contributes_to_demand(
- item,
- demand_name=sync_demand_name,
- demand_type=sync_demand_type,
- )
- export_dicts.append(
- {
- "item_type": item.get("item_type"),
- "item_text": item.get("item_text"),
- "point_category": item.get("point_category"),
- "matched_demand": item.get("matched_demand"),
- "wxindex_latest_score": item.get("wxindex_latest_score"),
- }
- )
- export_items.append(
- {
- "id": int(item["id"]),
- "item_type": str(item.get("item_type") or ""),
- "item_text": str(item.get("item_text") or ""),
- "point_category": str(item.get("point_category") or ""),
- "matched_demand": str(item.get("matched_demand") or ""),
- "contribution_score": float(item["contribution_score"])
- if item.get("contribution_score") is not None
- else None,
- "wxindex_keyword": str(item.get("wxindex_keyword") or ""),
- "all_hot_keywords": str(item.get("all_hot_keywords") or ""),
- "wxindex_latest_score": float(item.get("wxindex_latest_score") or 0),
- "wxindex_trend": str(item.get("wxindex_trend") or ""),
- "contributes_to_sync": contributes,
- }
- )
- max_wxindex = _record_wxindex_score(export_dicts)
- hive_weight = _hive_weight_for_record(
- export_dicts,
- wxindex_threshold=wxindex_threshold,
- )
- expected_id = build_demand_id(
- strategy=strategy_value,
- demand_name=sync_demand_name,
- partition_dt=partition_dt,
- )
- contribution = _load_json(record_row.get("contribution_demand_match_json"))
- wxindex = _load_json(record_row.get("wxindex_trend_json"))
- return {
- "wxindex_threshold": wxindex_threshold,
- "sync_log": {
- "id": int(sync_row["id"]),
- "partition_dt": partition_dt,
- "strategy": strategy_value,
- "demand_id": demand_id,
- "demand_name": sync_demand_name,
- "demand_type": sync_demand_type,
- "record_id": record_id,
- "synced_at": _fmt_datetime(sync_row.get("synced_at")),
- "demand_id_verified": expected_id == demand_id,
- "hive_weight": hive_weight,
- },
- "record": {
- "id": int(record_row["id"]),
- "source": str(record_row.get("source") or ""),
- "title": str(record_row.get("title") or ""),
- "article_title": str(record_row.get("article_title") or ""),
- "article_body": str(record_row.get("article_body") or ""),
- "hot_rank": int(record_row["hot_rank"])
- if record_row.get("hot_rank") is not None
- else None,
- "created_at": _fmt_datetime(record_row.get("created_at")),
- "contribution": contribution,
- "wxindex": wxindex,
- "max_wxindex_score": max_wxindex,
- "passes_wxindex_gate": max_wxindex >= wxindex_threshold,
- "passes_point_gate": any(
- _has_inspiration_or_goal_demand_match(row) for row in export_dicts
- ),
- },
- "export_rows": export_items,
- }
- def format_number(value: Any) -> str:
- try:
- number = float(value)
- except (TypeError, ValueError):
- return "-"
- if math.isnan(number):
- return "-"
- if abs(number) >= 10000:
- return f"{number / 10000:.1f}万"
- return f"{number:.0f}"
|