|
|
@@ -0,0 +1,347 @@
|
|
|
+"""新热事件需求词来源:按 sync_log 定位热点标题并返回单条展示数据。"""
|
|
|
+
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import hashlib
|
|
|
+import json
|
|
|
+import math
|
|
|
+from datetime import datetime
|
|
|
+from typing import Any
|
|
|
+
|
|
|
+from sqlalchemy import text
|
|
|
+
|
|
|
+from app.core.config import settings
|
|
|
+from app.db.hot_content_mysql import HotContentSessionLocal
|
|
|
+
|
|
|
+TYPE_FEATURE_POINT = "特征点"
|
|
|
+TYPE_PHRASE = "短语"
|
|
|
+ITEM_TYPE_ELEMENT = "元素"
|
|
|
+ITEM_TYPE_PHRASE = "短语"
|
|
|
+TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
|
|
|
+WEIGHT_DIVISOR = 1_000_000.0
|
|
|
+DEFAULT_WXINDEX_THRESHOLD = 1_000_000.0
|
|
|
+DEFAULT_HOT_STRATEGY = "新热事件"
|
|
|
+
|
|
|
+
|
|
|
+def _hot_strategy_name() -> str:
|
|
|
+ return str(
|
|
|
+ getattr(settings, "hot_demand_pool_strategy", None) or DEFAULT_HOT_STRATEGY
|
|
|
+ ).strip() or DEFAULT_HOT_STRATEGY
|
|
|
+
|
|
|
+
|
|
|
+def _wxindex_threshold() -> float:
|
|
|
+ raw = getattr(settings, "hot_content_wxindex_threshold", None)
|
|
|
+ if raw is None:
|
|
|
+ return DEFAULT_WXINDEX_THRESHOLD
|
|
|
+ try:
|
|
|
+ return float(raw)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return DEFAULT_WXINDEX_THRESHOLD
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_date(date_value: str | None) -> str:
|
|
|
+ if not date_value:
|
|
|
+ return ""
|
|
|
+ normalized = str(date_value).replace("-", "").strip()
|
|
|
+ return normalized if len(normalized) == 8 and normalized.isdigit() else ""
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_text(value: str) -> str:
|
|
|
+ return "".join(str(value or "").split())
|
|
|
+
|
|
|
+
|
|
|
+def _load_json(value: Any) -> dict[str, Any]:
|
|
|
+ if value is None:
|
|
|
+ return {}
|
|
|
+ if isinstance(value, dict):
|
|
|
+ return value
|
|
|
+ if isinstance(value, (bytes, bytearray)):
|
|
|
+ value = value.decode("utf-8")
|
|
|
+ if isinstance(value, str):
|
|
|
+ text_value = value.strip()
|
|
|
+ if not text_value:
|
|
|
+ return {}
|
|
|
+ try:
|
|
|
+ parsed = json.loads(text_value)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return {"__parse_error__": text_value}
|
|
|
+ return parsed if isinstance(parsed, dict) else {"value": parsed}
|
|
|
+ return {"value": value}
|
|
|
+
|
|
|
+
|
|
|
+def _fmt_datetime(value: Any) -> str:
|
|
|
+ if value is None:
|
|
|
+ return "-"
|
|
|
+ if isinstance(value, datetime):
|
|
|
+ return value.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ return str(value)
|
|
|
+
|
|
|
+
|
|
|
+def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
|
|
|
+ raw = f"{strategy}{demand_name.strip()}{partition_dt}"
|
|
|
+ return hashlib.md5(raw.encode("utf-8")).hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
|
|
|
+ scores: list[float] = []
|
|
|
+ for row in export_rows:
|
|
|
+ try:
|
|
|
+ scores.append(float(row.get("wxindex_latest_score") or 0))
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ continue
|
|
|
+ return max(scores) if scores else 0.0
|
|
|
+
|
|
|
+
|
|
|
+def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
|
|
|
+ point_category = str(row.get("point_category") or "").strip()
|
|
|
+ if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
|
|
|
+ return False
|
|
|
+ return bool(str(row.get("matched_demand") or "").strip())
|
|
|
+
|
|
|
+
|
|
|
+def _export_row_contributes_to_demand(
|
|
|
+ row: dict[str, Any],
|
|
|
+ *,
|
|
|
+ demand_name: str,
|
|
|
+ demand_type: str,
|
|
|
+) -> bool:
|
|
|
+ normalized_name = _normalize_text(demand_name)
|
|
|
+ item_type = str(row.get("item_type") or "")
|
|
|
+ item_text = str(row.get("item_text") or "")
|
|
|
+ matched = str(row.get("matched_demand") or "").strip()
|
|
|
+ if not matched:
|
|
|
+ return False
|
|
|
+ if demand_type == TYPE_PHRASE:
|
|
|
+ return item_type == ITEM_TYPE_PHRASE and _normalize_text(item_text) == normalized_name
|
|
|
+ if demand_type == TYPE_FEATURE_POINT:
|
|
|
+ return item_type == ITEM_TYPE_ELEMENT and _normalize_text(item_text) in normalized_name
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _hive_weight_for_record(
|
|
|
+ export_rows: list[dict[str, Any]],
|
|
|
+ *,
|
|
|
+ wxindex_threshold: float,
|
|
|
+) -> float | None:
|
|
|
+ if _record_wxindex_score(export_rows) < wxindex_threshold:
|
|
|
+ return None
|
|
|
+ if not any(_has_inspiration_or_goal_demand_match(row) for row in export_rows):
|
|
|
+ return None
|
|
|
+ return _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
|
|
|
+
|
|
|
+
|
|
|
+def fetch_sync_log_row(
|
|
|
+ *,
|
|
|
+ demand_name: str,
|
|
|
+ demand_type: str,
|
|
|
+ partition_dt: str,
|
|
|
+ strategy: str | None = None,
|
|
|
+) -> dict[str, Any] | None:
|
|
|
+ """按需求名称、需求类型、分区 dt 查询 hot_content_odps_sync_log 唯一记录。"""
|
|
|
+ name = (demand_name or "").strip()
|
|
|
+ dtype = (demand_type or "").strip()
|
|
|
+ dt = _normalize_date(partition_dt)
|
|
|
+ if not name or not dtype or not dt:
|
|
|
+ raise ValueError("demand_name、demand_type、dt 均为必填")
|
|
|
+ strategy_value = (strategy or _hot_strategy_name()).strip()
|
|
|
+ sql = text(
|
|
|
+ """
|
|
|
+ SELECT
|
|
|
+ id,
|
|
|
+ partition_dt,
|
|
|
+ strategy,
|
|
|
+ demand_id,
|
|
|
+ demand_name,
|
|
|
+ demand_type,
|
|
|
+ record_id,
|
|
|
+ synced_at
|
|
|
+ FROM hot_content_odps_sync_log
|
|
|
+ WHERE demand_name = :demand_name
|
|
|
+ AND demand_type = :demand_type
|
|
|
+ AND partition_dt = :partition_dt
|
|
|
+ AND strategy = :strategy
|
|
|
+ LIMIT 2
|
|
|
+ """
|
|
|
+ )
|
|
|
+ params = {
|
|
|
+ "demand_name": name,
|
|
|
+ "demand_type": dtype,
|
|
|
+ "partition_dt": dt,
|
|
|
+ "strategy": strategy_value,
|
|
|
+ }
|
|
|
+ with HotContentSessionLocal() as session:
|
|
|
+ rows = session.execute(sql, params).mappings().all()
|
|
|
+ if not rows:
|
|
|
+ return None
|
|
|
+ if len(rows) > 1:
|
|
|
+ raise ValueError("匹配到多条同步记录,请检查查询条件")
|
|
|
+ return dict(rows[0])
|
|
|
+
|
|
|
+
|
|
|
+def fetch_hot_content_source_detail(
|
|
|
+ *,
|
|
|
+ demand_name: str,
|
|
|
+ demand_type: str,
|
|
|
+ partition_dt: str,
|
|
|
+ strategy: str | None = None,
|
|
|
+) -> dict[str, Any]:
|
|
|
+ sync_row = fetch_sync_log_row(
|
|
|
+ demand_name=demand_name,
|
|
|
+ demand_type=demand_type,
|
|
|
+ partition_dt=partition_dt,
|
|
|
+ strategy=strategy,
|
|
|
+ )
|
|
|
+ if sync_row is None:
|
|
|
+ raise LookupError("未找到对应的同步记录")
|
|
|
+
|
|
|
+ record_id = int(sync_row.get("record_id") or 0)
|
|
|
+ if record_id <= 0:
|
|
|
+ raise LookupError("同步记录未关联热点内容")
|
|
|
+
|
|
|
+ wxindex_threshold = _wxindex_threshold()
|
|
|
+ strategy_value = str(sync_row.get("strategy") or _hot_strategy_name())
|
|
|
+ partition_dt = str(sync_row.get("partition_dt") or "")
|
|
|
+ sync_demand_name = str(sync_row.get("demand_name") or "")
|
|
|
+ sync_demand_type = str(sync_row.get("demand_type") or "")
|
|
|
+ demand_id = str(sync_row.get("demand_id") or "")
|
|
|
+
|
|
|
+ record_sql = text(
|
|
|
+ """
|
|
|
+ SELECT
|
|
|
+ id,
|
|
|
+ source,
|
|
|
+ title,
|
|
|
+ article_title,
|
|
|
+ article_body,
|
|
|
+ hot_rank,
|
|
|
+ execution_status,
|
|
|
+ postprocess_status,
|
|
|
+ created_at,
|
|
|
+ contribution_demand_match_json,
|
|
|
+ wxindex_trend_json
|
|
|
+ FROM hot_content_records
|
|
|
+ WHERE id = :record_id
|
|
|
+ LIMIT 1
|
|
|
+ """
|
|
|
+ )
|
|
|
+ export_sql = text(
|
|
|
+ """
|
|
|
+ SELECT
|
|
|
+ id,
|
|
|
+ item_type,
|
|
|
+ item_text,
|
|
|
+ point_category,
|
|
|
+ matched_demand,
|
|
|
+ contribution_score,
|
|
|
+ wxindex_keyword,
|
|
|
+ all_hot_keywords,
|
|
|
+ wxindex_latest_score,
|
|
|
+ wxindex_trend
|
|
|
+ FROM hot_content_demand_exports
|
|
|
+ WHERE record_id = :record_id
|
|
|
+ ORDER BY id ASC
|
|
|
+ """
|
|
|
+ )
|
|
|
+ with HotContentSessionLocal() as session:
|
|
|
+ record_row = session.execute(record_sql, {"record_id": record_id}).mappings().first()
|
|
|
+ export_rows_raw = session.execute(export_sql, {"record_id": record_id}).mappings().all()
|
|
|
+
|
|
|
+ if record_row is None:
|
|
|
+ raise LookupError("未找到关联热点内容")
|
|
|
+
|
|
|
+ export_dicts: list[dict[str, Any]] = []
|
|
|
+ export_items: list[dict[str, Any]] = []
|
|
|
+ for row in export_rows_raw:
|
|
|
+ item = dict(row)
|
|
|
+ contributes = _export_row_contributes_to_demand(
|
|
|
+ item,
|
|
|
+ demand_name=sync_demand_name,
|
|
|
+ demand_type=sync_demand_type,
|
|
|
+ )
|
|
|
+ export_dicts.append(
|
|
|
+ {
|
|
|
+ "item_type": item.get("item_type"),
|
|
|
+ "item_text": item.get("item_text"),
|
|
|
+ "point_category": item.get("point_category"),
|
|
|
+ "matched_demand": item.get("matched_demand"),
|
|
|
+ "wxindex_latest_score": item.get("wxindex_latest_score"),
|
|
|
+ }
|
|
|
+ )
|
|
|
+ export_items.append(
|
|
|
+ {
|
|
|
+ "id": int(item["id"]),
|
|
|
+ "item_type": str(item.get("item_type") or ""),
|
|
|
+ "item_text": str(item.get("item_text") or ""),
|
|
|
+ "point_category": str(item.get("point_category") or ""),
|
|
|
+ "matched_demand": str(item.get("matched_demand") or ""),
|
|
|
+ "contribution_score": float(item["contribution_score"])
|
|
|
+ if item.get("contribution_score") is not None
|
|
|
+ else None,
|
|
|
+ "wxindex_keyword": str(item.get("wxindex_keyword") or ""),
|
|
|
+ "all_hot_keywords": str(item.get("all_hot_keywords") or ""),
|
|
|
+ "wxindex_latest_score": float(item.get("wxindex_latest_score") or 0),
|
|
|
+ "wxindex_trend": str(item.get("wxindex_trend") or ""),
|
|
|
+ "contributes_to_sync": contributes,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ max_wxindex = _record_wxindex_score(export_dicts)
|
|
|
+ hive_weight = _hive_weight_for_record(
|
|
|
+ export_dicts,
|
|
|
+ wxindex_threshold=wxindex_threshold,
|
|
|
+ )
|
|
|
+ expected_id = build_demand_id(
|
|
|
+ strategy=strategy_value,
|
|
|
+ demand_name=sync_demand_name,
|
|
|
+ partition_dt=partition_dt,
|
|
|
+ )
|
|
|
+
|
|
|
+ contribution = _load_json(record_row.get("contribution_demand_match_json"))
|
|
|
+ wxindex = _load_json(record_row.get("wxindex_trend_json"))
|
|
|
+
|
|
|
+ return {
|
|
|
+ "wxindex_threshold": wxindex_threshold,
|
|
|
+ "sync_log": {
|
|
|
+ "id": int(sync_row["id"]),
|
|
|
+ "partition_dt": partition_dt,
|
|
|
+ "strategy": strategy_value,
|
|
|
+ "demand_id": demand_id,
|
|
|
+ "demand_name": sync_demand_name,
|
|
|
+ "demand_type": sync_demand_type,
|
|
|
+ "record_id": record_id,
|
|
|
+ "synced_at": _fmt_datetime(sync_row.get("synced_at")),
|
|
|
+ "demand_id_verified": expected_id == demand_id,
|
|
|
+ "hive_weight": hive_weight,
|
|
|
+ },
|
|
|
+ "record": {
|
|
|
+ "id": int(record_row["id"]),
|
|
|
+ "source": str(record_row.get("source") or ""),
|
|
|
+ "title": str(record_row.get("title") or ""),
|
|
|
+ "article_title": str(record_row.get("article_title") or ""),
|
|
|
+ "article_body": str(record_row.get("article_body") or ""),
|
|
|
+ "hot_rank": int(record_row["hot_rank"])
|
|
|
+ if record_row.get("hot_rank") is not None
|
|
|
+ else None,
|
|
|
+ "created_at": _fmt_datetime(record_row.get("created_at")),
|
|
|
+ "contribution": contribution,
|
|
|
+ "wxindex": wxindex,
|
|
|
+ "max_wxindex_score": max_wxindex,
|
|
|
+ "passes_wxindex_gate": max_wxindex >= wxindex_threshold,
|
|
|
+ "passes_point_gate": any(
|
|
|
+ _has_inspiration_or_goal_demand_match(row) for row in export_dicts
|
|
|
+ ),
|
|
|
+ },
|
|
|
+ "export_rows": export_items,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def format_number(value: Any) -> str:
|
|
|
+ try:
|
|
|
+ number = float(value)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return "-"
|
|
|
+ if math.isnan(number):
|
|
|
+ return "-"
|
|
|
+ if abs(number) >= 10000:
|
|
|
+ return f"{number / 10000:.1f}万"
|
|
|
+ return f"{number:.0f}"
|