"""新热事件需求写入 Hive 的行构建逻辑。""" from __future__ import annotations import hashlib from typing import Any from app.hot_content.demand_export import ( ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE, attach_wxindex_metadata, build_demand_export_rows, ) from app.hot_content.demand_quality import ( attach_quality_scores_to_export_rows, build_feature_combo_text, build_matched_element_texts, quality_passed, ) TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"}) TYPE_FEATURE_POINT = "特征点" TYPE_PHRASE = "短语" WEIGHT_DIVISOR = 1_000_000.0 def _normalize_demand_key(value: str) -> str: return "".join(value.split()) def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str: raw = f"{strategy}{demand_name}{partition_dt}" return hashlib.md5(raw.encode("utf-8")).hexdigest() def _dedupe_texts(texts: list[str]) -> list[str]: deduped: list[str] = [] seen: set[str] = set() for raw in texts: text = str(raw).strip() if not text: continue keys = {text, _normalize_demand_key(text)} if keys & seen: continue seen.update(keys) deduped.append(text) return deduped def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float: scores: list[float] = [] for row in export_rows: try: scores.append(float(row.get("wxindex_latest_score") or 0)) except (TypeError, ValueError): continue return max(scores) if scores else 0.0 def _should_retain_title( export_rows: list[dict[str, Any]], *, wxindex_threshold: float, ) -> bool: if _record_wxindex_score(export_rows) < wxindex_threshold: return False return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows) def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool: point_category = str(row.get("point_category") or "").strip() if point_category not in TITLE_RETAIN_POINT_CATEGORIES: return False return bool(str(row.get("matched_demand") or "").strip()) def _has_matched_demand(row: dict[str, Any]) -> bool: return bool(str(row.get("matched_demand") or "").strip()) def _export_row_passes_quality( row: dict[str, Any], *, export_rows: list[dict[str, Any]], event_sense_json: dict[str, Any] | None, senior_fit_json: dict[str, Any] | None, event_threshold: float, senior_threshold: float, ) -> bool: item_type = str(row.get("item_type") or "") if item_type == ITEM_TYPE_PHRASE: return quality_passed( demand_type=TYPE_PHRASE, demand_text=str(row.get("item_text") or "").strip(), event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row): element_text = str(row.get("item_text") or "").strip() if not element_text: return False return quality_passed( demand_type=TYPE_FEATURE_POINT, demand_text=element_text, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) return False def is_export_row_as_demand( row: dict[str, Any], export_rows: list[dict[str, Any]], *, wxindex_threshold: float, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> int: """是否与 ODPS 需求同步规则一致:标题保留、质量达标且该行有匹配需求。返回 0/1。""" if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold): return 0 if not _has_matched_demand(row): return 0 if event_sense_json is not None or senior_fit_json is not None: if not _export_row_passes_quality( row, export_rows=export_rows, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ): return 0 return 1 def _build_export_rows_for_record( record: dict[str, Any], *, wxindex_threshold: float, event_sense_json: dict[str, Any] | None, senior_fit_json: dict[str, Any] | None, event_threshold: float, senior_threshold: float, ) -> list[dict[str, Any]]: match_result = record.get("contribution_demand_match_json") if not isinstance(match_result, dict): return [] contribution_points = record.get("contribution_points_json") trend_json = record.get("wxindex_trend_json") export_rows = attach_wxindex_metadata( build_demand_export_rows( match_result, contribution_points=( contribution_points if isinstance(contribution_points, dict) else None ), trend_json=trend_json if isinstance(trend_json, dict) else None, ), trend_json if isinstance(trend_json, dict) else None, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) return attach_quality_scores_to_export_rows( export_rows, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) def build_hive_rows_for_record( export_rows: list[dict[str, Any]], *, record_id: int, strategy: str, partition_dt: str, wxindex_threshold: float, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> list[dict[str, Any]]: if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold): return [] weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR feature_combo = build_feature_combo_text(export_rows) element_texts = build_matched_element_texts(export_rows) phrase_texts = _dedupe_texts( [ str(row.get("item_text") or "").strip() for row in export_rows if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE and _has_matched_demand(row) and quality_passed( demand_type=TYPE_PHRASE, demand_text=str(row.get("item_text") or "").strip(), event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) ] ) hive_rows: list[dict[str, Any]] = [] if feature_combo and quality_passed( demand_type=TYPE_FEATURE_POINT, demand_text=feature_combo, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ): hive_rows.append( _build_hive_row( record_id=record_id, strategy=strategy, demand_name=feature_combo, weight=weight, demand_type=TYPE_FEATURE_POINT, partition_dt=partition_dt, ) ) for element_text in element_texts: if quality_passed( demand_type=TYPE_FEATURE_POINT, demand_text=element_text, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ): hive_rows.append( _build_hive_row( record_id=record_id, strategy=strategy, demand_name=element_text, weight=weight, demand_type=TYPE_FEATURE_POINT, partition_dt=partition_dt, ) ) for phrase_text in phrase_texts: hive_rows.append( _build_hive_row( record_id=record_id, strategy=strategy, demand_name=phrase_text, weight=weight, demand_type=TYPE_PHRASE, partition_dt=partition_dt, ) ) return hive_rows def build_hive_rows_for_odps_record( record: dict[str, Any], *, strategy: str, partition_dt: str, wxindex_threshold: float, event_threshold: float, senior_threshold: float, ) -> list[dict[str, Any]]: event_sense_json = record.get("demand_event_sense_json") senior_fit_json = record.get("demand_senior_fit_json") if not isinstance(event_sense_json, dict): event_sense_json = {} if not isinstance(senior_fit_json, dict): senior_fit_json = {} export_rows = _build_export_rows_for_record( record, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) return build_hive_rows_for_record( export_rows, record_id=int(record.get("id") or 0), strategy=strategy, partition_dt=partition_dt, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) def build_hive_rows_from_export_groups( export_groups: list[dict[str, Any]], *, strategy: str, partition_dt: str, wxindex_threshold: float, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] seen_demand_ids: set[str] = set() for group in export_groups: export_rows = group.get("export_rows") or [] if not isinstance(export_rows, list): continue record_id = int(group.get("record_id") or 0) if record_id <= 0: continue event_sense_json = group.get("demand_event_sense_json") senior_fit_json = group.get("demand_senior_fit_json") for hive_row in build_hive_rows_for_record( export_rows, record_id=record_id, strategy=strategy, partition_dt=partition_dt, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json if isinstance(event_sense_json, dict) else None, senior_fit_json=senior_fit_json if isinstance(senior_fit_json, dict) else None, event_threshold=event_threshold, senior_threshold=senior_threshold, ): demand_id = str(hive_row["demand_id"]) if demand_id in seen_demand_ids: continue seen_demand_ids.add(demand_id) rows.append(hive_row) return rows def build_hive_rows_from_odps_records( records: list[dict[str, Any]], *, strategy: str, partition_dt: str, wxindex_threshold: float, event_threshold: float, senior_threshold: float, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] seen_demand_ids: set[str] = set() for record in records: record_id = int(record.get("id") or 0) if record_id <= 0: continue for hive_row in build_hive_rows_for_odps_record( record, strategy=strategy, partition_dt=partition_dt, wxindex_threshold=wxindex_threshold, event_threshold=event_threshold, senior_threshold=senior_threshold, ): demand_id = str(hive_row["demand_id"]) if demand_id in seen_demand_ids: continue seen_demand_ids.add(demand_id) rows.append(hive_row) return rows def _build_hive_row( *, record_id: int, strategy: str, demand_name: str, weight: float, demand_type: str, partition_dt: str, ) -> dict[str, Any]: normalized_name = demand_name.strip() return { "record_id": record_id, "strategy": strategy, "demand_id": build_demand_id( strategy=strategy, demand_name=normalized_name, partition_dt=partition_dt, ), "demand_name": normalized_name, "weight": weight, "type": demand_type, "video_count": None, "video_list": [], "extend": "{}", "dt": partition_dt, }