| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import re
- from dataclasses import dataclass, field
- from typing import Any, Dict, List, Optional
- from ._const import (
- ConfigCode,
- DemandRecommendConst,
- DemandSource,
- MatchMethod,
- )
- # ──────────────────────────────────────────────
- # Dataclasses
- # ──────────────────────────────────────────────
- @dataclass
- class DemandRecord:
- """上游需求表的一行原始数据(字段暂按结果表推测,后续按实际表结构对齐)"""
- dt: str = ""
- action_type: str = ""
- match_experiment_id: str = ""
- demand_source_crowd: str = ""
- demand_strategy: str = ""
- match_strategy: str = ""
- match_video_rule: str = ""
- demand_id: str = ""
- crowd_channel: str = ""
- crowd_segment: str = ""
- crowd_package: str = ""
- conversion_target: str = ""
- partner: str = ""
- account: str = ""
- scene_value: str = ""
- demand_source: str = ""
- drive_dim_time: str = ""
- drive_dim_space: str = ""
- demand_filter_strategy: str = ""
- demand_video_id: int = 0
- demand_video_title: str = ""
- scene_content_id: str = ""
- scene_content_title: str = ""
- demand_topic: str = ""
- demand_feature_points: str = ""
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> "DemandRecord":
- return cls(
- dt=str(data.get("dt", "")),
- action_type=str(data.get("action_type", "")),
- match_experiment_id=str(data.get("match_experiment_id", "")),
- demand_source_crowd=str(data.get("demand_source_crowd", "")),
- demand_strategy=str(data.get("demand_strategy", "")),
- match_strategy=str(data.get("match_strategy", "")),
- match_video_rule=str(data.get("match_video_rule", "")),
- demand_id=str(data.get("demand_id", "")),
- crowd_channel=str(data.get("crowd_channel", "")),
- crowd_segment=str(data.get("crowd_segment", "")),
- crowd_package=str(data.get("crowd_package", "")),
- conversion_target=str(data.get("conversion_target", "")),
- partner=str(data.get("partner", "")),
- account=str(data.get("account", "")),
- scene_value=str(data.get("scene_value", "")),
- demand_source=str(data.get("demand_source", "")),
- drive_dim_time=str(data.get("drive_dim_time", "")),
- drive_dim_space=str(data.get("drive_dim_space", "")),
- demand_filter_strategy=str(data.get("demand_filter_strategy", "")),
- demand_video_id=int(data.get("demand_video_id", 0) or 0),
- demand_video_title=str(data.get("demand_video_title", "")),
- scene_content_id=str(data.get("scene_content_id", "")),
- scene_content_title=str(data.get("scene_content_title", "")),
- demand_topic=str(data.get("demand_topic", "")),
- demand_feature_points=str(data.get("demand_feature_points", "")),
- )
- @dataclass
- class MatchStrategy:
- """从 DemandRecord 解析出的匹配执行策略"""
- demand_id: str
- experiment_id: str
- dt: str
- match_methods: List[str] = field(default_factory=list)
- config_codes: List[str] = field(default_factory=list)
- top_n: int = DemandRecommendConst.DEFAULT_TOPN
- query_text: str = ""
- video_id: int = 0
- content_id: str = ""
- filter_rule: str = ""
- multi_recall_fusion: bool = False
- @dataclass
- class MatchResult:
- """单条匹配结果"""
- dt: str
- demand_id: str
- match_experiment_id: str
- match_method: str
- config_code: str
- video_id: int
- score: float
- rank_position: int = 0
- video_title: str = ""
- video_detail: Optional[Dict[str, Any]] = None
- # recallWithScore 专用字段,非 scoring 模式时为 0
- sim: float = 0.0
- sim_norm: float = 0.0
- rov: float = 0.0
- rov_norm: float = 0.0
- # ──────────────────────────────────────────────
- # Strategy Parser
- # ──────────────────────────────────────────────
- class DemandStrategyParser:
- """解析 DemandRecord → MatchStrategy"""
- @staticmethod
- def select_config_codes(match_strategy: str) -> List[str]:
- """从匹配策略文本中推导 configCode 列表"""
- if not match_strategy:
- return [DemandRecommendConst.DEFAULT_CONFIG_CODE]
- codes: List[str] = []
- for keyword, code in DemandRecommendConst.STRATEGY_CONFIG_MAP.items():
- if keyword in match_strategy and code not in codes:
- codes.append(code)
- if not codes:
- codes.append(DemandRecommendConst.DEFAULT_CONFIG_CODE)
- return codes
- @staticmethod
- def select_match_methods(demand: DemandRecord) -> List[str]:
- """从需求行的 match_video_rule + 可用字段推导匹配方式"""
- if not demand.match_video_rule:
- return DemandStrategyParser._fallback_methods(demand)
- methods: List[str] = []
- for keyword, method in DemandRecommendConst.RULE_METHOD_MAP.items():
- if keyword in demand.match_video_rule:
- if method == MatchMethod.VIDEO_ID and demand.demand_video_id > 0:
- methods.append(method)
- elif method == MatchMethod.CONTENT_ID and demand.scene_content_id:
- methods.append(method)
- elif method == MatchMethod.TEXT and (
- demand.demand_topic or demand.demand_feature_points
- ):
- methods.append(method)
- if not methods:
- return DemandStrategyParser._fallback_methods(demand)
- return methods
- @staticmethod
- def _fallback_methods(demand: DemandRecord) -> List[str]:
- """当 match_video_rule 无法解析时,按可用字段兜底推导"""
- methods: List[str] = []
- if demand.demand_video_id > 0:
- methods.append(MatchMethod.VIDEO_ID)
- if demand.scene_content_id:
- methods.append(MatchMethod.CONTENT_ID)
- if demand.demand_topic or demand.demand_feature_points:
- methods.append(MatchMethod.TEXT)
- if not methods:
- methods.append(MatchMethod.TEXT) # 最终兜底
- return methods
- @staticmethod
- def parse_top_n(match_strategy: str) -> int:
- """从匹配策略中解析 topN 参数,缺省 10"""
- if not match_strategy:
- return DemandRecommendConst.DEFAULT_TOPN
- m = re.search(r"topN[=:]?\s*(\d+)", match_strategy, re.IGNORECASE)
- if m:
- return int(m.group(1))
- return DemandRecommendConst.DEFAULT_TOPN
- @classmethod
- def parse(cls, demand: DemandRecord) -> MatchStrategy:
- """完整解析一条需求记录为匹配策略"""
- return MatchStrategy(
- demand_id=demand.demand_id,
- experiment_id=demand.match_experiment_id,
- dt=demand.dt,
- match_methods=cls.select_match_methods(demand),
- config_codes=cls.select_config_codes(demand.match_strategy),
- top_n=cls.parse_top_n(demand.match_strategy),
- query_text=build_query_text(demand.demand_topic, demand.demand_feature_points),
- video_id=demand.demand_video_id,
- content_id=demand.scene_content_id,
- filter_rule=demand.demand_filter_strategy,
- multi_recall_fusion=("多路" in (demand.match_strategy or ""))
- or (len(cls.select_match_methods(demand)) > 1),
- )
- # ──────────────────────────────────────────────
- # Helpers
- # ──────────────────────────────────────────────
- def build_query_text(topic: str, feature_points: str) -> str:
- """拼接选题 + 特征点为检索文本"""
- parts = [p for p in [topic, feature_points] if p and p.strip()]
- return "。".join(parts) if parts else ""
- def parse_recall_items(
- api_response: Dict[str, Any],
- strategy: MatchStrategy,
- match_method: str,
- config_code: str,
- ) -> List[MatchResult]:
- """解析 API 返回结果为 MatchResult 列表"""
- if not api_response or api_response.get("code") != 0:
- return []
- data = api_response.get("data")
- if not data:
- return []
- # matchTopNVideo 返回 data 直接是 list
- if isinstance(data, list):
- items = data
- else:
- # recallTest 返回 data.items[]
- items = data.get("items", [])
- results: List[MatchResult] = []
- for rank, item in enumerate(items, start=1):
- vid = item.get("id") or item.get("videoId", 0)
- if not vid:
- continue
- results.append(MatchResult(
- dt=strategy.dt,
- demand_id=strategy.demand_id,
- match_experiment_id=strategy.experiment_id,
- match_method=match_method,
- config_code=config_code,
- video_id=int(vid),
- score=float(item.get("score", 0)),
- rank_position=rank,
- video_title=str(item.get("title", "")),
- video_detail=item.get("videoDetail"),
- sim=float(item.get("sim", 0)),
- sim_norm=float(item.get("simNorm", 0)),
- rov=float(item.get("rov", 0)),
- rov_norm=float(item.get("rovNorm", 0)),
- ))
- return results
- def parse_scored_items(
- api_response: Dict[str, Any],
- strategy: MatchStrategy,
- config_code: str,
- ) -> List[MatchResult]:
- """解析 recallWithScore 返回的 scored items 为 MatchResult 列表"""
- if not api_response or api_response.get("code") != 0:
- return []
- data = api_response.get("data")
- if not data:
- return []
- items = data.get("items", [])
- results: List[MatchResult] = []
- for rank, item in enumerate(items, start=1):
- vid = item.get("videoId", 0)
- if not vid:
- continue
- detail = item.get("videoDetail") or {}
- # 优先从 videoDetail 取真实标题,取不到用 text(向量化选题)
- raw_title = detail.get("title") or detail.get("选题") or str(item.get("text", ""))
- results.append(MatchResult(
- dt=strategy.dt,
- demand_id=strategy.demand_id,
- match_experiment_id=strategy.experiment_id,
- match_method=MatchMethod.TEXT,
- config_code=item.get("configCode", config_code),
- video_id=int(vid),
- score=float(item.get("score", 0) or item.get("sim", 0)),
- rank_position=rank,
- video_title=raw_title,
- video_detail=detail,
- sim=float(item.get("sim", 0)),
- sim_norm=float(item.get("simNorm", 0)),
- rov=float(item.get("rov", 0)),
- rov_norm=float(item.get("rovNorm", 0)),
- ))
- return results
- def merge_multi_recall(
- result_groups: List[List[MatchResult]],
- top_n: int,
- ) -> List[MatchResult]:
- """多路召回结果合并:按 video_id 去重,保留最高分,取 top_n"""
- merged: Dict[int, MatchResult] = {}
- for group in result_groups:
- for r in group:
- if r.video_id in merged:
- if r.score > merged[r.video_id].score:
- merged[r.video_id] = r
- else:
- merged[r.video_id] = r
- sorted_results = sorted(merged.values(), key=lambda x: x.score, reverse=True)
- for i, r in enumerate(sorted_results[:top_n], start=1):
- r.rank_position = i
- return sorted_results[:top_n]
|