| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- """新热事件需求写入 Hive 的行构建逻辑。"""
- from __future__ import annotations
- import hashlib
- from typing import Any
- from app.hot_content.demand_export import (
- ITEM_TYPE_ELEMENT,
- ITEM_TYPE_PHRASE,
- attach_wxindex_metadata,
- build_demand_export_rows,
- )
- from app.hot_content.demand_quality import (
- attach_quality_scores_to_export_rows,
- build_feature_combo_text,
- build_matched_element_texts,
- quality_passed,
- )
- TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
- TYPE_FEATURE_POINT = "特征点"
- TYPE_PHRASE = "短语"
- WEIGHT_DIVISOR = 1_000_000.0
- def _normalize_demand_key(value: str) -> str:
- return "".join(value.split())
- def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
- raw = f"{strategy}{demand_name}{partition_dt}"
- return hashlib.md5(raw.encode("utf-8")).hexdigest()
- def _dedupe_texts(texts: list[str]) -> list[str]:
- deduped: list[str] = []
- seen: set[str] = set()
- for raw in texts:
- text = str(raw).strip()
- if not text:
- continue
- keys = {text, _normalize_demand_key(text)}
- if keys & seen:
- continue
- seen.update(keys)
- deduped.append(text)
- return deduped
- def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
- scores: list[float] = []
- for row in export_rows:
- try:
- scores.append(float(row.get("wxindex_latest_score") or 0))
- except (TypeError, ValueError):
- continue
- return max(scores) if scores else 0.0
- def _should_retain_title(
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- ) -> bool:
- if _record_wxindex_score(export_rows) < wxindex_threshold:
- return False
- return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
- def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
- point_category = str(row.get("point_category") or "").strip()
- if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
- return False
- return bool(str(row.get("matched_demand") or "").strip())
- def _has_matched_demand(row: dict[str, Any]) -> bool:
- return bool(str(row.get("matched_demand") or "").strip())
- def _export_row_passes_quality(
- row: dict[str, Any],
- *,
- export_rows: list[dict[str, Any]],
- event_sense_json: dict[str, Any] | None,
- senior_fit_json: dict[str, Any] | None,
- event_threshold: float,
- senior_threshold: float,
- ) -> bool:
- item_type = str(row.get("item_type") or "")
- if item_type == ITEM_TYPE_PHRASE:
- return quality_passed(
- demand_type=TYPE_PHRASE,
- demand_text=str(row.get("item_text") or "").strip(),
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row):
- element_text = str(row.get("item_text") or "").strip()
- if not element_text:
- return False
- return quality_passed(
- demand_type=TYPE_FEATURE_POINT,
- demand_text=element_text,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- return False
- def is_export_row_as_demand(
- row: dict[str, Any],
- export_rows: list[dict[str, Any]],
- *,
- wxindex_threshold: float,
- event_sense_json: dict[str, Any] | None = None,
- senior_fit_json: dict[str, Any] | None = None,
- event_threshold: float = 0.0,
- senior_threshold: float = 0.0,
- ) -> int:
- """是否与 ODPS 需求同步规则一致:标题保留、质量达标且该行有匹配需求。返回 0/1。"""
- if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
- return 0
- if not _has_matched_demand(row):
- return 0
- if event_sense_json is not None or senior_fit_json is not None:
- if not _export_row_passes_quality(
- row,
- export_rows=export_rows,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- ):
- return 0
- return 1
- def _build_export_rows_for_record(
- record: dict[str, Any],
- *,
- wxindex_threshold: float,
- event_sense_json: dict[str, Any] | None,
- senior_fit_json: dict[str, Any] | None,
- event_threshold: float,
- senior_threshold: float,
- ) -> list[dict[str, Any]]:
- match_result = record.get("contribution_demand_match_json")
- if not isinstance(match_result, dict):
- return []
- contribution_points = record.get("contribution_points_json")
- trend_json = record.get("wxindex_trend_json")
- export_rows = attach_wxindex_metadata(
- build_demand_export_rows(
- match_result,
- contribution_points=(
- contribution_points if isinstance(contribution_points, dict) else None
- ),
- trend_json=trend_json if isinstance(trend_json, dict) else None,
- ),
- trend_json if isinstance(trend_json, dict) else None,
- wxindex_threshold=wxindex_threshold,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- return attach_quality_scores_to_export_rows(
- export_rows,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- def build_hive_rows_for_record(
- export_rows: list[dict[str, Any]],
- *,
- record_id: int,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- event_sense_json: dict[str, Any] | None = None,
- senior_fit_json: dict[str, Any] | None = None,
- event_threshold: float = 0.0,
- senior_threshold: float = 0.0,
- ) -> list[dict[str, Any]]:
- if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
- return []
- weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
- feature_combo = build_feature_combo_text(export_rows)
- element_texts = build_matched_element_texts(export_rows)
- phrase_texts = _dedupe_texts(
- [
- str(row.get("item_text") or "").strip()
- for row in export_rows
- if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
- and _has_matched_demand(row)
- and quality_passed(
- demand_type=TYPE_PHRASE,
- demand_text=str(row.get("item_text") or "").strip(),
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- ]
- )
- hive_rows: list[dict[str, Any]] = []
- if feature_combo and quality_passed(
- demand_type=TYPE_FEATURE_POINT,
- demand_text=feature_combo,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- ):
- hive_rows.append(
- _build_hive_row(
- record_id=record_id,
- strategy=strategy,
- demand_name=feature_combo,
- weight=weight,
- demand_type=TYPE_FEATURE_POINT,
- partition_dt=partition_dt,
- )
- )
- for element_text in element_texts:
- if quality_passed(
- demand_type=TYPE_FEATURE_POINT,
- demand_text=element_text,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- ):
- hive_rows.append(
- _build_hive_row(
- record_id=record_id,
- strategy=strategy,
- demand_name=element_text,
- weight=weight,
- demand_type=TYPE_FEATURE_POINT,
- partition_dt=partition_dt,
- )
- )
- for phrase_text in phrase_texts:
- hive_rows.append(
- _build_hive_row(
- record_id=record_id,
- strategy=strategy,
- demand_name=phrase_text,
- weight=weight,
- demand_type=TYPE_PHRASE,
- partition_dt=partition_dt,
- )
- )
- return hive_rows
- def build_hive_rows_for_odps_record(
- record: dict[str, Any],
- *,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- event_threshold: float,
- senior_threshold: float,
- ) -> list[dict[str, Any]]:
- event_sense_json = record.get("demand_event_sense_json")
- senior_fit_json = record.get("demand_senior_fit_json")
- if not isinstance(event_sense_json, dict):
- event_sense_json = {}
- if not isinstance(senior_fit_json, dict):
- senior_fit_json = {}
- export_rows = _build_export_rows_for_record(
- record,
- wxindex_threshold=wxindex_threshold,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- return build_hive_rows_for_record(
- export_rows,
- record_id=int(record.get("id") or 0),
- strategy=strategy,
- partition_dt=partition_dt,
- wxindex_threshold=wxindex_threshold,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- )
- def build_hive_rows_from_export_groups(
- export_groups: list[dict[str, Any]],
- *,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- event_threshold: float = 0.0,
- senior_threshold: float = 0.0,
- ) -> list[dict[str, Any]]:
- rows: list[dict[str, Any]] = []
- seen_demand_ids: set[str] = set()
- for group in export_groups:
- export_rows = group.get("export_rows") or []
- if not isinstance(export_rows, list):
- continue
- record_id = int(group.get("record_id") or 0)
- if record_id <= 0:
- continue
- event_sense_json = group.get("demand_event_sense_json")
- senior_fit_json = group.get("demand_senior_fit_json")
- for hive_row in build_hive_rows_for_record(
- export_rows,
- record_id=record_id,
- strategy=strategy,
- partition_dt=partition_dt,
- wxindex_threshold=wxindex_threshold,
- event_sense_json=event_sense_json if isinstance(event_sense_json, dict) else None,
- senior_fit_json=senior_fit_json if isinstance(senior_fit_json, dict) else None,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- ):
- demand_id = str(hive_row["demand_id"])
- if demand_id in seen_demand_ids:
- continue
- seen_demand_ids.add(demand_id)
- rows.append(hive_row)
- return rows
- def build_hive_rows_from_odps_records(
- records: list[dict[str, Any]],
- *,
- strategy: str,
- partition_dt: str,
- wxindex_threshold: float,
- event_threshold: float,
- senior_threshold: float,
- ) -> list[dict[str, Any]]:
- rows: list[dict[str, Any]] = []
- seen_demand_ids: set[str] = set()
- for record in records:
- record_id = int(record.get("id") or 0)
- if record_id <= 0:
- continue
- for hive_row in build_hive_rows_for_odps_record(
- record,
- strategy=strategy,
- partition_dt=partition_dt,
- wxindex_threshold=wxindex_threshold,
- event_threshold=event_threshold,
- senior_threshold=senior_threshold,
- ):
- demand_id = str(hive_row["demand_id"])
- if demand_id in seen_demand_ids:
- continue
- seen_demand_ids.add(demand_id)
- rows.append(hive_row)
- return rows
- def _build_hive_row(
- *,
- record_id: int,
- strategy: str,
- demand_name: str,
- weight: float,
- demand_type: str,
- partition_dt: str,
- ) -> dict[str, Any]:
- normalized_name = demand_name.strip()
- return {
- "record_id": record_id,
- "strategy": strategy,
- "demand_id": build_demand_id(
- strategy=strategy,
- demand_name=normalized_name,
- partition_dt=partition_dt,
- ),
- "demand_name": normalized_name,
- "weight": weight,
- "type": demand_type,
- "video_count": None,
- "video_list": [],
- "extend": "{}",
- "dt": partition_dt,
- }
|