import re from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from ._const import ( ConfigCode, DemandRecommendConst, DemandSource, MatchMethod, ) # ────────────────────────────────────────────── # Dataclasses # ────────────────────────────────────────────── @dataclass class DemandRecord: """上游需求表的一行原始数据(字段暂按结果表推测,后续按实际表结构对齐)""" dt: str = "" action_type: str = "" match_experiment_id: str = "" demand_source_crowd: str = "" demand_strategy: str = "" match_strategy: str = "" match_video_rule: str = "" demand_id: str = "" crowd_channel: str = "" crowd_segment: str = "" crowd_package: str = "" conversion_target: str = "" partner: str = "" account: str = "" scene_value: str = "" demand_source: str = "" drive_dim_time: str = "" drive_dim_space: str = "" demand_filter_strategy: str = "" demand_video_id: int = 0 demand_video_title: str = "" scene_content_id: str = "" scene_content_title: str = "" demand_topic: str = "" demand_feature_points: str = "" @classmethod def from_dict(cls, data: Dict[str, Any]) -> "DemandRecord": return cls( dt=str(data.get("dt", "")), action_type=str(data.get("action_type", "")), match_experiment_id=str(data.get("match_experiment_id", "")), demand_source_crowd=str(data.get("demand_source_crowd", "")), demand_strategy=str(data.get("demand_strategy", "")), match_strategy=str(data.get("match_strategy", "")), match_video_rule=str(data.get("match_video_rule", "")), demand_id=str(data.get("demand_id", "")), crowd_channel=str(data.get("crowd_channel", "")), crowd_segment=str(data.get("crowd_segment", "")), crowd_package=str(data.get("crowd_package", "")), conversion_target=str(data.get("conversion_target", "")), partner=str(data.get("partner", "")), account=str(data.get("account", "")), scene_value=str(data.get("scene_value", "")), demand_source=str(data.get("demand_source", "")), drive_dim_time=str(data.get("drive_dim_time", "")), drive_dim_space=str(data.get("drive_dim_space", "")), demand_filter_strategy=str(data.get("demand_filter_strategy", "")), demand_video_id=int(data.get("demand_video_id", 0) or 0), demand_video_title=str(data.get("demand_video_title", "")), scene_content_id=str(data.get("scene_content_id", "")), scene_content_title=str(data.get("scene_content_title", "")), demand_topic=str(data.get("demand_topic", "")), demand_feature_points=str(data.get("demand_feature_points", "")), ) @dataclass class MatchStrategy: """从 DemandRecord 解析出的匹配执行策略""" demand_id: str experiment_id: str dt: str match_methods: List[str] = field(default_factory=list) config_codes: List[str] = field(default_factory=list) top_n: int = DemandRecommendConst.DEFAULT_TOPN query_text: str = "" video_id: int = 0 content_id: str = "" filter_rule: str = "" multi_recall_fusion: bool = False @dataclass class MatchResult: """单条匹配结果""" dt: str demand_id: str match_experiment_id: str match_method: str config_code: str video_id: int score: float rank_position: int = 0 video_title: str = "" video_detail: Optional[Dict[str, Any]] = None # recallWithScore 专用字段,非 scoring 模式时为 0 sim: float = 0.0 sim_norm: float = 0.0 rov: float = 0.0 rov_norm: float = 0.0 # ────────────────────────────────────────────── # Strategy Parser # ────────────────────────────────────────────── class DemandStrategyParser: """解析 DemandRecord → MatchStrategy""" @staticmethod def select_config_codes(match_strategy: str) -> List[str]: """从匹配策略文本中推导 configCode 列表""" if not match_strategy: return [DemandRecommendConst.DEFAULT_CONFIG_CODE] codes: List[str] = [] for keyword, code in DemandRecommendConst.STRATEGY_CONFIG_MAP.items(): if keyword in match_strategy and code not in codes: codes.append(code) if not codes: codes.append(DemandRecommendConst.DEFAULT_CONFIG_CODE) return codes @staticmethod def select_match_methods(demand: DemandRecord) -> List[str]: """从需求行的 match_video_rule + 可用字段推导匹配方式""" if not demand.match_video_rule: return DemandStrategyParser._fallback_methods(demand) methods: List[str] = [] for keyword, method in DemandRecommendConst.RULE_METHOD_MAP.items(): if keyword in demand.match_video_rule: if method == MatchMethod.VIDEO_ID and demand.demand_video_id > 0: methods.append(method) elif method == MatchMethod.CONTENT_ID and demand.scene_content_id: methods.append(method) elif method == MatchMethod.TEXT and ( demand.demand_topic or demand.demand_feature_points ): methods.append(method) if not methods: return DemandStrategyParser._fallback_methods(demand) return methods @staticmethod def _fallback_methods(demand: DemandRecord) -> List[str]: """当 match_video_rule 无法解析时,按可用字段兜底推导""" methods: List[str] = [] if demand.demand_video_id > 0: methods.append(MatchMethod.VIDEO_ID) if demand.scene_content_id: methods.append(MatchMethod.CONTENT_ID) if demand.demand_topic or demand.demand_feature_points: methods.append(MatchMethod.TEXT) if not methods: methods.append(MatchMethod.TEXT) # 最终兜底 return methods @staticmethod def parse_top_n(match_strategy: str) -> int: """从匹配策略中解析 topN 参数,缺省 10""" if not match_strategy: return DemandRecommendConst.DEFAULT_TOPN m = re.search(r"topN[=:]?\s*(\d+)", match_strategy, re.IGNORECASE) if m: return int(m.group(1)) return DemandRecommendConst.DEFAULT_TOPN @classmethod def parse(cls, demand: DemandRecord) -> MatchStrategy: """完整解析一条需求记录为匹配策略""" return MatchStrategy( demand_id=demand.demand_id, experiment_id=demand.match_experiment_id, dt=demand.dt, match_methods=cls.select_match_methods(demand), config_codes=cls.select_config_codes(demand.match_strategy), top_n=cls.parse_top_n(demand.match_strategy), query_text=build_query_text(demand.demand_topic, demand.demand_feature_points), video_id=demand.demand_video_id, content_id=demand.scene_content_id, filter_rule=demand.demand_filter_strategy, multi_recall_fusion=("多路" in (demand.match_strategy or "")) or (len(cls.select_match_methods(demand)) > 1), ) # ────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────── def build_query_text(topic: str, feature_points: str) -> str: """拼接选题 + 特征点为检索文本""" parts = [p for p in [topic, feature_points] if p and p.strip()] return "。".join(parts) if parts else "" def parse_recall_items( api_response: Dict[str, Any], strategy: MatchStrategy, match_method: str, config_code: str, ) -> List[MatchResult]: """解析 API 返回结果为 MatchResult 列表""" if not api_response or api_response.get("code") != 0: return [] data = api_response.get("data") if not data: return [] # matchTopNVideo 返回 data 直接是 list if isinstance(data, list): items = data else: # recallTest 返回 data.items[] items = data.get("items", []) results: List[MatchResult] = [] for rank, item in enumerate(items, start=1): vid = item.get("id") or item.get("videoId", 0) if not vid: continue results.append(MatchResult( dt=strategy.dt, demand_id=strategy.demand_id, match_experiment_id=strategy.experiment_id, match_method=match_method, config_code=config_code, video_id=int(vid), score=float(item.get("score", 0)), rank_position=rank, video_title=str(item.get("title", "")), video_detail=item.get("videoDetail"), sim=float(item.get("sim", 0)), sim_norm=float(item.get("simNorm", 0)), rov=float(item.get("rov", 0)), rov_norm=float(item.get("rovNorm", 0)), )) return results def parse_scored_items( api_response: Dict[str, Any], strategy: MatchStrategy, config_code: str, ) -> List[MatchResult]: """解析 recallWithScore 返回的 scored items 为 MatchResult 列表""" if not api_response or api_response.get("code") != 0: return [] data = api_response.get("data") if not data: return [] items = data.get("items", []) results: List[MatchResult] = [] for rank, item in enumerate(items, start=1): vid = item.get("videoId", 0) if not vid: continue detail = item.get("videoDetail") or {} # 优先从 videoDetail 取真实标题,取不到用 text(向量化选题) raw_title = detail.get("title") or detail.get("选题") or str(item.get("text", "")) results.append(MatchResult( dt=strategy.dt, demand_id=strategy.demand_id, match_experiment_id=strategy.experiment_id, match_method=MatchMethod.TEXT, config_code=item.get("configCode", config_code), video_id=int(vid), score=float(item.get("score", 0) or item.get("sim", 0)), rank_position=rank, video_title=raw_title, video_detail=detail, sim=float(item.get("sim", 0)), sim_norm=float(item.get("simNorm", 0)), rov=float(item.get("rov", 0)), rov_norm=float(item.get("rovNorm", 0)), )) return results def merge_multi_recall( result_groups: List[List[MatchResult]], top_n: int, ) -> List[MatchResult]: """多路召回结果合并:按 video_id 去重,保留最高分,取 top_n""" merged: Dict[int, MatchResult] = {} for group in result_groups: for r in group: if r.video_id in merged: if r.score > merged[r.video_id].score: merged[r.video_id] = r else: merged[r.video_id] = r sorted_results = sorted(merged.values(), key=lambda x: x.score, reverse=True) for i, r in enumerate(sorted_results[:top_n], start=1): r.rank_position = i return sorted_results[:top_n]