"""近期热点需求写入 Hive 的行构建逻辑。""" from __future__ import annotations import hashlib from typing import Any from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"}) TYPE_FEATURE_POINT = "特征点" TYPE_PHRASE = "短语" WEIGHT_DIVISOR = 1_000_000.0 def _normalize_demand_key(value: str) -> str: return "".join(value.split()) def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str: raw = f"{strategy}{demand_name}{partition_dt}" return hashlib.md5(raw.encode("utf-8")).hexdigest() def _dedupe_texts(texts: list[str]) -> list[str]: deduped: list[str] = [] seen: set[str] = set() for raw in texts: text = str(raw).strip() if not text: continue keys = {text, _normalize_demand_key(text)} if keys & seen: continue seen.update(keys) deduped.append(text) return deduped def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float: scores: list[float] = [] for row in export_rows: try: scores.append(float(row.get("wxindex_latest_score") or 0)) except (TypeError, ValueError): continue return max(scores) if scores else 0.0 def _should_retain_title( export_rows: list[dict[str, Any]], *, wxindex_threshold: float, ) -> bool: if _record_wxindex_score(export_rows) < wxindex_threshold: return False return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows) def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool: point_category = str(row.get("point_category") or "").strip() if point_category not in TITLE_RETAIN_POINT_CATEGORIES: return False return bool(str(row.get("matched_demand") or "").strip()) def _has_matched_demand(row: dict[str, Any]) -> bool: return bool(str(row.get("matched_demand") or "").strip()) def build_hive_rows_for_record( export_rows: list[dict[str, Any]], *, record_id: int, strategy: str, partition_dt: str, wxindex_threshold: float, ) -> list[dict[str, Any]]: if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold): return [] weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR element_texts = _dedupe_texts( [ str(row.get("item_text") or "").strip() for row in export_rows if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row) ] ) phrase_texts = _dedupe_texts( [ str(row.get("item_text") or "").strip() for row in export_rows if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE and _has_matched_demand(row) ] ) hive_rows: list[dict[str, Any]] = [] if element_texts: demand_name = " ".join(element_texts) hive_rows.append( _build_hive_row( record_id=record_id, strategy=strategy, demand_name=demand_name, weight=weight, demand_type=TYPE_FEATURE_POINT, partition_dt=partition_dt, ) ) for phrase_text in phrase_texts: hive_rows.append( _build_hive_row( record_id=record_id, strategy=strategy, demand_name=phrase_text, weight=weight, demand_type=TYPE_PHRASE, partition_dt=partition_dt, ) ) return hive_rows def build_hive_rows_from_export_groups( export_groups: list[dict[str, Any]], *, strategy: str, partition_dt: str, wxindex_threshold: float, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] seen_demand_ids: set[str] = set() for group in export_groups: export_rows = group.get("export_rows") or [] if not isinstance(export_rows, list): continue record_id = int(group.get("record_id") or 0) if record_id <= 0: continue for hive_row in build_hive_rows_for_record( export_rows, record_id=record_id, strategy=strategy, partition_dt=partition_dt, wxindex_threshold=wxindex_threshold, ): demand_id = str(hive_row["demand_id"]) if demand_id in seen_demand_ids: continue seen_demand_ids.add(demand_id) rows.append(hive_row) return rows def _build_hive_row( *, record_id: int, strategy: str, demand_name: str, weight: float, demand_type: str, partition_dt: str, ) -> dict[str, Any]: normalized_name = demand_name.strip() return { "record_id": record_id, "strategy": strategy, "demand_id": build_demand_id( strategy=strategy, demand_name=normalized_name, partition_dt=partition_dt, ), "demand_name": normalized_name, "weight": weight, "type": demand_type, "video_count": None, "video_list": [], "extend": "{}", "dt": partition_dt, }