match_engine.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import asyncio
  2. from typing import Any, Dict, List, Optional
  3. from app.infra.shared.http_client import AsyncHttpClient
  4. from ._const import (
  5. ConfigCode,
  6. DemandRecommendConst,
  7. MatchMethod,
  8. )
  9. from ._utils import (
  10. DemandRecord,
  11. DemandStrategyParser,
  12. MatchResult,
  13. MatchStrategy,
  14. merge_multi_recall,
  15. parse_recall_items,
  16. parse_scored_items,
  17. )
  18. class DemandVideoMatchEngine:
  19. """统一匹配引擎:读策略 → 调用 API → 返回 MatchResult 列表"""
  20. def __init__(self, base_url: str = DemandRecommendConst.BASE_URL):
  21. self.base_url = base_url.rstrip("/")
  22. # ── 统一入口 ──
  23. async def match(self, demand: DemandRecord) -> List[MatchResult]:
  24. """匹配单条需求,返回去重/合并后的结果"""
  25. strategy = DemandStrategyParser.parse(demand)
  26. if not strategy.match_methods or not strategy.config_codes:
  27. return []
  28. # 构建所有 (match_method, config_code) 笛卡尔积任务
  29. tasks = []
  30. for method in strategy.match_methods:
  31. for code in strategy.config_codes:
  32. tasks.append(self._match_one(method, code, strategy))
  33. # 串行执行同一需求的多个 API 调用(避免单需求打太多请求)
  34. all_result_groups: List[List[MatchResult]] = []
  35. for task in tasks:
  36. try:
  37. results = await task
  38. if results:
  39. all_result_groups.append(results)
  40. except Exception:
  41. continue # 单路失败不阻塞其他路
  42. if not all_result_groups:
  43. return []
  44. if strategy.multi_recall_fusion and len(all_result_groups) > 1:
  45. return merge_multi_recall(all_result_groups, strategy.top_n)
  46. # 单路召回:取第一个非空组
  47. return all_result_groups[0][:strategy.top_n]
  48. async def _match_one(
  49. self,
  50. method: str,
  51. config_code: str,
  52. strategy: MatchStrategy,
  53. ) -> List[MatchResult]:
  54. """执行单路匹配"""
  55. async with AsyncHttpClient(timeout=DemandRecommendConst.API_TIMEOUT) as client:
  56. if method == MatchMethod.TEXT:
  57. resp = await self._match_by_text(client, strategy.query_text, config_code, strategy.top_n)
  58. elif method == MatchMethod.VIDEO_ID:
  59. resp = await self._match_by_video_id(client, strategy.video_id, config_code, strategy.top_n)
  60. elif method == MatchMethod.CONTENT_ID:
  61. resp = await self._match_by_content_id(client, strategy.content_id, config_code, strategy.top_n)
  62. else:
  63. return []
  64. return parse_recall_items(resp, strategy, method, config_code)
  65. # ── API 调用 ──
  66. async def _match_by_text(
  67. self,
  68. client: AsyncHttpClient,
  69. query_text: str,
  70. config_code: str,
  71. top_n: int,
  72. ) -> Dict[str, Any]:
  73. url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_BY_TEXT}"
  74. body = {
  75. "queryText": query_text,
  76. "configCode": config_code,
  77. "topN": top_n,
  78. }
  79. resp = await client.post(url, json=body)
  80. return resp if isinstance(resp, dict) else {}
  81. async def _match_by_video_id(
  82. self,
  83. client: AsyncHttpClient,
  84. video_id: int,
  85. config_code: str,
  86. top_n: int,
  87. ) -> Dict[str, Any]:
  88. url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_BY_VIDEO_ID}"
  89. body = {
  90. "videoId": video_id,
  91. "configCode": config_code,
  92. "topN": top_n,
  93. }
  94. resp = await client.post(url, json=body)
  95. return resp if isinstance(resp, dict) else {}
  96. async def _match_by_content_id(
  97. self,
  98. client: AsyncHttpClient,
  99. content_id: str,
  100. config_code: str,
  101. top_n: int,
  102. ) -> Dict[str, Any]:
  103. url = f"{self.base_url}{DemandRecommendConst.ApiPath.MATCH_TOP_N_VIDEO}"
  104. body = {
  105. "channelContentId": content_id,
  106. "configCode": config_code,
  107. "topN": top_n,
  108. }
  109. resp = await client.post(url, json=body)
  110. return resp if isinstance(resp, dict) else {}
  111. # ── 标题匹配 ──
  112. async def _recall_with_score(
  113. self,
  114. client: AsyncHttpClient,
  115. query_text: str,
  116. config_code: str,
  117. top_n: int,
  118. alpha: float = 0.6,
  119. sim_min: float = 0.7,
  120. ) -> Dict[str, Any]:
  121. """调用 /videoSearch/recallWithScore,返回综合考虑 sim+rov 的排序结果"""
  122. url = f"{self.base_url}{DemandRecommendConst.ApiPath.RECALL_WITH_SCORE}"
  123. body = {
  124. "queryText": query_text,
  125. "configCode": config_code,
  126. "topN": top_n,
  127. "alpha": alpha,
  128. "simMin": sim_min,
  129. }
  130. resp = await client.post(url, json=body)
  131. return resp if isinstance(resp, dict) else {}
  132. async def match_by_title(
  133. self,
  134. title: str,
  135. config_code: str = None,
  136. top_n: int = 10,
  137. use_scoring: bool = False,
  138. alpha: float = 0.6,
  139. sim_min: float = 0.7,
  140. ) -> List[MatchResult]:
  141. """单条标题匹配:以标题文本在向量库中检索最相似的视频。
  142. Args:
  143. title: 标题文本
  144. config_code: 向量维度编码,默认 VIDEO_TOPIC
  145. top_n: 返回数量,默认 10
  146. use_scoring: True 走 recallWithScore(sim+rov 综合排序)
  147. alpha: 仅 use_scoring=True 时生效,相关性权重 0~1
  148. sim_min: 仅 use_scoring=True 时生效,相似度粗筛阈值
  149. """
  150. if not title or not title.strip():
  151. return []
  152. code = config_code or DemandRecommendConst.DEFAULT_CONFIG_CODE
  153. strategy = MatchStrategy(
  154. demand_id="",
  155. experiment_id="",
  156. dt="",
  157. match_methods=[MatchMethod.TEXT],
  158. config_codes=[code],
  159. top_n=top_n,
  160. query_text=title.strip(),
  161. )
  162. async with AsyncHttpClient(timeout=DemandRecommendConst.API_TIMEOUT) as client:
  163. if use_scoring:
  164. resp = await self._recall_with_score(
  165. client, title.strip(), code, top_n, alpha, sim_min
  166. )
  167. return parse_scored_items(resp, strategy, code)
  168. else:
  169. resp = await self._match_by_text(
  170. client, title.strip(), code, top_n
  171. )
  172. return parse_recall_items(resp, strategy, MatchMethod.TEXT, code)
  173. async def match_titles_batch(
  174. self,
  175. titles: List[str],
  176. config_code: str = None,
  177. top_n: int = 10,
  178. use_scoring: bool = False,
  179. alpha: float = 0.6,
  180. sim_min: float = 0.7,
  181. ) -> Dict[str, List[MatchResult]]:
  182. """批量标题匹配:串行逐条匹配,返回 {title: [MatchResult, ...]}。
  183. Args:
  184. titles: 标题文本列表
  185. config_code: 向量维度编码,默认 VIDEO_TOPIC
  186. top_n: 每个标题返回数量,默认 10
  187. use_scoring: True 走 recallWithScore
  188. alpha: 仅 use_scoring=True 时生效
  189. sim_min: 仅 use_scoring=True 时生效
  190. """
  191. results: Dict[str, List[MatchResult]] = {}
  192. for title in titles:
  193. key = title.strip() if title else ""
  194. if not key:
  195. results[title] = []
  196. continue
  197. try:
  198. results[title] = await self.match_by_title(
  199. key, config_code=config_code, top_n=top_n,
  200. use_scoring=use_scoring, alpha=alpha, sim_min=sim_min,
  201. )
  202. except Exception:
  203. results[title] = []
  204. return results