| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- """近期热点需求写入 Hive 的行构建逻辑。"""
- from __future__ import annotations
- import hashlib
- from typing import Any
- from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
- TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
- TYPE_FEATURE_POINT = "特征点"
- TYPE_PHRASE = "短语"
- WEIGHT_DIVISOR = 1_000_000.0
- def _normalize_demand_key(value: str) -> str:
- return "".join(value.split())
- def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
- raw = f"{strategy}{demand_name}{partition_dt}"
- return hashlib.md5(raw.encode("utf-8")).hexdigest()
- def _dedupe_texts(texts: list[str]) -> list[str]:
- deduped: list[str] = []
- seen: set[str] = set()
- for raw in texts:
- text = str(raw).strip()
- if not text:
- continue
- keys = {text, _normalize_demand_key(text)}
- if keys & seen:
- continue
- seen.update(keys)
- deduped.append(text)
- return deduped
- def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
- scores: list[float] = []
- for row in export_rows:
- try:
- scores.append(float(row.get("wxindex_latest_score") or 0))
- except (TypeError, ValueError):
- continue
- return max(scores) if scores else 0.0
- def _should_retain_title(
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- ) -> bool:
- if _record_wxindex_score(export_rows) < wxindex_threshold:
- return False
- return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
- def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
- point_category = str(row.get("point_category") or "").strip()
- if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
- return False
- return bool(str(row.get("matched_demand") or "").strip())
- def _has_matched_demand(row: dict[str, Any]) -> bool:
- return bool(str(row.get("matched_demand") or "").strip())
- def build_hive_rows_for_record(
- export_rows: list[dict[str, Any]],
- *,
- record_id: int,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- ) -> list[dict[str, Any]]:
- if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
- return []
- weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
- element_texts = _dedupe_texts(
- [
- str(row.get("item_text") or "").strip()
- for row in export_rows
- if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
- ]
- )
- phrase_texts = _dedupe_texts(
- [
- str(row.get("item_text") or "").strip()
- for row in export_rows
- if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
- and _has_matched_demand(row)
- ]
- )
- hive_rows: list[dict[str, Any]] = []
- if element_texts:
- demand_name = " ".join(element_texts)
- hive_rows.append(
- _build_hive_row(
- record_id=record_id,
- strategy=strategy,
- demand_name=demand_name,
- weight=weight,
- demand_type=TYPE_FEATURE_POINT,
- partition_dt=partition_dt,
- )
- )
- for phrase_text in phrase_texts:
- hive_rows.append(
- _build_hive_row(
- record_id=record_id,
- strategy=strategy,
- demand_name=phrase_text,
- weight=weight,
- demand_type=TYPE_PHRASE,
- partition_dt=partition_dt,
- )
- )
- return hive_rows
- def build_hive_rows_from_export_groups(
- export_groups: list[dict[str, Any]],
- *,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- ) -> list[dict[str, Any]]:
- rows: list[dict[str, Any]] = []
- seen_demand_ids: set[str] = set()
- for group in export_groups:
- export_rows = group.get("export_rows") or []
- if not isinstance(export_rows, list):
- continue
- record_id = int(group.get("record_id") or 0)
- if record_id <= 0:
- continue
- for hive_row in build_hive_rows_for_record(
- export_rows,
- record_id=record_id,
- strategy=strategy,
- partition_dt=partition_dt,
- wxindex_threshold=wxindex_threshold,
- ):
- demand_id = str(hive_row["demand_id"])
- if demand_id in seen_demand_ids:
- continue
- seen_demand_ids.add(demand_id)
- rows.append(hive_row)
- return rows
- def _build_hive_row(
- *,
- record_id: int,
- strategy: str,
- demand_name: str,
- weight: float,
- demand_type: str,
- partition_dt: str,
- ) -> dict[str, Any]:
- normalized_name = demand_name.strip()
- return {
- "record_id": record_id,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "weight": weight,
- "type": demand_type,
- "video_count": None,
- "video_list": [],
- "extend": "{}",
- "dt": partition_dt,
- }
|