content_deconstruction_search.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """
  2. 内容解析结果搜索工具 - 根据关键词搜索视频标题和标题的解析结果
  3. 用于 Agent 执行时根据关键词搜索视频内容及其解析结果。
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. from typing import Any, Dict, List, Optional
  9. import httpx
  10. from agent.tools import tool, ToolResult
  11. # API 配置
  12. CONTENT_DECONSTRUCTION_BASE_URL = os.getenv(
  13. "CONTENT_DECONSTRUCTION_BASE_URL", "http://api.piaoquantv.com"
  14. )
  15. DEFAULT_TIMEOUT = 30.0
  16. async def _call_content_deconstruction_api(
  17. keywords: List[str],
  18. ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
  19. """调用内容解析结果搜索 API,返回结果字典。"""
  20. url = f"{CONTENT_DECONSTRUCTION_BASE_URL.rstrip('/')}/supply-demand-engine-service/content/queryContentDeconstructionResultByKeywords"
  21. payload = {"keywords": keywords}
  22. try:
  23. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  24. resp = await client.post(
  25. url,
  26. json=payload,
  27. headers={"Content-Type": "application/json"},
  28. )
  29. resp.raise_for_status()
  30. data = resp.json()
  31. except httpx.HTTPStatusError as e:
  32. raise RuntimeError(
  33. f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
  34. )
  35. except Exception as e:
  36. raise RuntimeError(f"请求异常: {str(e)}")
  37. # 解析响应格式: {'code': 0, 'msg': 'success', 'data': {...}, 'success': True}
  38. if isinstance(data, dict):
  39. # 检查 code 字段
  40. code = data.get("code", 0)
  41. if code != 0:
  42. msg = data.get("msg", "未知错误")
  43. raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
  44. # 获取 data 字段
  45. result_data = data.get("data", {})
  46. if isinstance(result_data, dict):
  47. return result_data
  48. return {}
  49. return {}
  50. @tool(
  51. description="根据关键词搜索视频标题和标题的解析结果。支持传入多个关键词,自动过滤掉 type 为'选题'的 description,统计每个 contentId 对应的关键词数量,并返回最高匹配比例(匹配最多关键词的 contentId 占所有关键词的比例)。",
  52. display={
  53. "zh": {
  54. "name": "内容解析结果搜索",
  55. "params": {
  56. "keywords": "关键词列表,例如:['食用', '禁忌']",
  57. },
  58. },
  59. },
  60. )
  61. async def query_content_deconstruction_by_keywords(
  62. keywords: List[str],
  63. ) -> ToolResult:
  64. """
  65. 根据关键词搜索视频标题和标题的解析结果。
  66. 处理流程:
  67. 1. 过滤掉 description 中 type 为 "选题" 的项
  68. 2. 统计每个 contentId 对应的关键词数量
  69. 3. 计算并返回最高匹配比例(匹配最多关键词的 contentId 占所有关键词的比例)
  70. Args:
  71. keywords: 关键词列表,例如:['食用', '禁忌']
  72. Returns:
  73. ToolResult: 包含过滤后的结果、contentId 统计信息、最高匹配比例等
  74. """
  75. # 验证关键词列表
  76. if not keywords:
  77. return ToolResult(
  78. title="内容解析结果搜索失败",
  79. output="",
  80. error="关键词列表不能为空",
  81. )
  82. if not isinstance(keywords, list):
  83. return ToolResult(
  84. title="内容解析结果搜索失败",
  85. output="",
  86. error=f"关键词必须是列表类型,当前类型为: {type(keywords).__name__}",
  87. )
  88. # 过滤空字符串
  89. keywords = [kw.strip() for kw in keywords if kw and kw.strip()]
  90. if not keywords:
  91. return ToolResult(
  92. title="内容解析结果搜索失败",
  93. output="",
  94. error="关键词列表中没有有效的关键词",
  95. )
  96. try:
  97. results = await _call_content_deconstruction_api(keywords=keywords)
  98. except RuntimeError as e:
  99. return ToolResult(
  100. title="内容解析结果搜索失败",
  101. output="",
  102. error=str(e),
  103. )
  104. if not results:
  105. return ToolResult(
  106. title="内容解析结果搜索",
  107. output=json.dumps(
  108. {
  109. "max_content_id_ratio": 0.0,
  110. },
  111. ensure_ascii=False,
  112. indent=2,
  113. ),
  114. )
  115. # 1. 过滤 description,去掉 type 为 "选题" 的项
  116. filtered_results = {}
  117. for keyword, videos in results.items():
  118. if not isinstance(videos, list):
  119. continue
  120. filtered_videos = []
  121. for video in videos:
  122. if not isinstance(video, dict):
  123. continue
  124. # 复制视频信息
  125. filtered_video = video.copy()
  126. # 过滤 description,去掉 type 为 "选题" 的项
  127. if "description" in filtered_video and isinstance(filtered_video["description"], list):
  128. filtered_video["description"] = [
  129. desc for desc in filtered_video["description"]
  130. if isinstance(desc, dict) and desc.get("type") != "选题"
  131. ]
  132. # 如果过滤后还有 description,说明该词匹配到了该 contentId
  133. if filtered_video.get("description") and len(filtered_video["description"]) > 0:
  134. filtered_videos.append(filtered_video)
  135. if filtered_videos:
  136. filtered_results[keyword] = filtered_videos
  137. # 2. 统计每个 contentId 对应的关键词数量
  138. content_id_to_keywords: Dict[str, set] = {}
  139. for keyword, videos in filtered_results.items():
  140. for video in videos:
  141. content_id = video.get("contentId")
  142. if content_id:
  143. if content_id not in content_id_to_keywords:
  144. content_id_to_keywords[content_id] = set()
  145. content_id_to_keywords[content_id].add(keyword)
  146. # 3. 计算每个 contentId 对应最多词的比例
  147. max_keyword_count = 0
  148. max_content_id = None
  149. if content_id_to_keywords:
  150. for content_id, matched_keywords in content_id_to_keywords.items():
  151. keyword_count = len(matched_keywords)
  152. if keyword_count > max_keyword_count:
  153. max_keyword_count = keyword_count
  154. max_content_id = content_id
  155. # 计算比例(匹配到的关键词数 / 总关键词数)
  156. max_ratio = max_keyword_count / len(keywords) if keywords else 0.0
  157. # 构建输出结果,只返回最高匹配度
  158. output_data = {
  159. "max_content_id_ratio": round(max_ratio * 100, 2), # 转换为百分比,保留2位小数
  160. }
  161. output = json.dumps(output_data, ensure_ascii=False, indent=2)
  162. return ToolResult(
  163. title=f"内容解析结果搜索 - {len(keywords)} 个关键词",
  164. output=output,
  165. long_term_memory=f"检索到内容解析结果,关键词: {', '.join(keywords)},最高匹配比例: {max_ratio * 100:.2f}%",
  166. )
  167. if __name__ == '__main__':
  168. res = asyncio.run(query_content_deconstruction_by_keywords(['食用', '禁忌']))
  169. print(res)