| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import asyncio
- from typing import Any, Dict, List, Optional
- from app.infra.shared.http_client import AsyncHttpClient
- from ._const import (
- ConfigCode,
- DemandRecommendConst,
- MatchMethod,
- )
- from ._utils import (
- DemandRecord,
- DemandStrategyParser,
- MatchResult,
- MatchStrategy,
- merge_multi_recall,
- parse_recall_items,
- parse_scored_items,
- )
- class DemandVideoMatchEngine:
- """统一匹配引擎:读策略 → 调用 API → 返回 MatchResult 列表"""
- def __init__(self, base_url: str = DemandRecommendConst.BASE_URL):
- self.base_url = base_url.rstrip("/")
- # ── 统一入口 ──
- async def match(self, demand: DemandRecord) -> List[MatchResult]:
- """匹配单条需求,返回去重/合并后的结果"""
- strategy = DemandStrategyParser.parse(demand)
- if not strategy.match_methods or not strategy.config_codes:
- return []
- # 构建所有 (match_method, config_code) 笛卡尔积任务
- tasks = []
- for method in strategy.match_methods:
- for code in strategy.config_codes:
- tasks.append(self._match_one(method, code, strategy))
- # 串行执行同一需求的多个 API 调用(避免单需求打太多请求)
- all_result_groups: List[List[MatchResult]] = []
- for task in tasks:
- try:
- results = await task
- if results:
- all_result_groups.append(results)
- except Exception:
- continue # 单路失败不阻塞其他路
- if not all_result_groups:
- return []
- if strategy.multi_recall_fusion and len(all_result_groups) > 1:
- return merge_multi_recall(all_result_groups, strategy.top_n)
- # 单路召回:取第一个非空组
- return all_result_groups[0][:strategy.top_n]
- async def _match_one(
- self,
- method: str,
- config_code: str,
- strategy: MatchStrategy,
- ) -> List[MatchResult]:
- """执行单路匹配"""
- async with AsyncHttpClient(timeout=DemandRecommendConst.API_TIMEOUT) as client:
- if method == MatchMethod.TEXT:
- resp = await self._match_by_text(client, strategy.query_text, config_code, strategy.top_n)
- elif method == MatchMethod.VIDEO_ID:
- resp = await self._match_by_video_id(client, strategy.video_id, config_code, strategy.top_n)
- elif method == MatchMethod.CONTENT_ID:
- resp = await self._match_by_content_id(client, strategy.content_id, config_code, strategy.top_n)
- else:
- return []
- return parse_recall_items(resp, strategy, method, config_code)
- # ── API 调用 ──
- async def _match_by_text(
- self,
- client: AsyncHttpClient,
- query_text: str,
- config_code: str,
- top_n: int,
- ) -> Dict[str, Any]:
- url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_BY_TEXT}"
- body = {
- "queryText": query_text,
- "configCode": config_code,
- "topN": top_n,
- }
- resp = await client.post(url, json=body)
- return resp if isinstance(resp, dict) else {}
- async def _match_by_video_id(
- self,
- client: AsyncHttpClient,
- video_id: int,
- config_code: str,
- top_n: int,
- ) -> Dict[str, Any]:
- url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_BY_VIDEO_ID}"
- body = {
- "videoId": video_id,
- "configCode": config_code,
- "topN": top_n,
- }
- resp = await client.post(url, json=body)
- return resp if isinstance(resp, dict) else {}
- async def _match_by_content_id(
- self,
- client: AsyncHttpClient,
- content_id: str,
- config_code: str,
- top_n: int,
- ) -> Dict[str, Any]:
- url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_TOP_N_VIDEO}"
- body = {
- "channelContentId": content_id,
- "configCode": config_code,
- "topN": top_n,
- }
- resp = await client.post(url, json=body)
- return resp if isinstance(resp, dict) else {}
- # ── 标题匹配 ──
- async def _recall_with_score(
- self,
- client: AsyncHttpClient,
- query_text: str,
- config_code: str,
- top_n: int,
- alpha: float = 0.6,
- sim_min: float = 0.7,
- ) -> Dict[str, Any]:
- """调用 /videoSearch/recallWithScore,返回综合考虑 sim+rov 的排序结果"""
- url = f"{self.base_url}{DemandRecommendConst.ApiPath.RECALL_WITH_SCORE}"
- body = {
- "queryText": query_text,
- "configCode": config_code,
- "topN": top_n,
- "alpha": alpha,
- "simMin": sim_min,
- }
- resp = await client.post(url, json=body)
- return resp if isinstance(resp, dict) else {}
- async def match_by_title(
- self,
- title: str,
- config_code: str = None,
- top_n: int = 10,
- use_scoring: bool = False,
- alpha: float = 0.6,
- sim_min: float = 0.7,
- ) -> List[MatchResult]:
- """单条标题匹配:以标题文本在向量库中检索最相似的视频。
- Args:
- title: 标题文本
- config_code: 向量维度编码,默认 VIDEO_TOPIC
- top_n: 返回数量,默认 10
- use_scoring: True 走 recallWithScore(sim+rov 综合排序)
- alpha: 仅 use_scoring=True 时生效,相关性权重 0~1
- sim_min: 仅 use_scoring=True 时生效,相似度粗筛阈值
- """
- if not title or not title.strip():
- return []
- code = config_code or DemandRecommendConst.DEFAULT_CONFIG_CODE
- strategy = MatchStrategy(
- demand_id="",
- experiment_id="",
- dt="",
- match_methods=[MatchMethod.TEXT],
- config_codes=[code],
- top_n=top_n,
- query_text=title.strip(),
- )
- async with AsyncHttpClient(timeout=DemandRecommendConst.API_TIMEOUT) as client:
- if use_scoring:
- resp = await self._recall_with_score(
- client, title.strip(), code, top_n, alpha, sim_min
- )
- return parse_scored_items(resp, strategy, code)
- else:
- resp = await self._match_by_text(
- client, title.strip(), code, top_n
- )
- return parse_recall_items(resp, strategy, MatchMethod.TEXT, code)
- async def match_titles_batch(
- self,
- titles: List[str],
- config_code: str = None,
- top_n: int = 10,
- use_scoring: bool = False,
- alpha: float = 0.6,
- sim_min: float = 0.7,
- ) -> Dict[str, List[MatchResult]]:
- """批量标题匹配:串行逐条匹配,返回 {title: [MatchResult, ...]}。
- Args:
- titles: 标题文本列表
- config_code: 向量维度编码,默认 VIDEO_TOPIC
- top_n: 每个标题返回数量,默认 10
- use_scoring: True 走 recallWithScore
- alpha: 仅 use_scoring=True 时生效
- sim_min: 仅 use_scoring=True 时生效
- """
- results: Dict[str, List[MatchResult]] = {}
- for title in titles:
- key = title.strip() if title else ""
- if not key:
- results[title] = []
- continue
- try:
- results[title] = await self.match_by_title(
- key, config_code=config_code, top_n=top_n,
- use_scoring=use_scoring, alpha=alpha, sim_min=sim_min,
- )
- except Exception:
- results[title] = []
- return results
|