content_deconstruction_search.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """
  2. 内容解析结果搜索工具 - 根据关键词搜索视频标题和标题的解析结果
  3. 用于 Agent 执行时根据关键词搜索视频内容及其解析结果。
  4. """
  5. import json
  6. import os
  7. from typing import Any, Dict, List, Optional
  8. import httpx
  9. from agent.tools import tool, ToolResult
  10. # API 配置
  11. CONTENT_DECONSTRUCTION_BASE_URL = os.getenv(
  12. "CONTENT_DECONSTRUCTION_BASE_URL", "http://api.piaoquantv.com"
  13. )
  14. DEFAULT_TIMEOUT = 30.0
  15. async def _call_content_deconstruction_api(
  16. keywords: List[str],
  17. ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
  18. """调用内容解析结果搜索 API,返回结果字典。"""
  19. url = f"{CONTENT_DECONSTRUCTION_BASE_URL.rstrip('/')}/supply-demand-engine-service/content/queryContentDeconstructionResultByKeywords"
  20. payload = {"keywords": keywords}
  21. try:
  22. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  23. resp = await client.post(
  24. url,
  25. json=payload,
  26. headers={"Content-Type": "application/json"},
  27. )
  28. resp.raise_for_status()
  29. data = resp.json()
  30. except httpx.HTTPStatusError as e:
  31. raise RuntimeError(
  32. f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
  33. )
  34. except Exception as e:
  35. raise RuntimeError(f"请求异常: {str(e)}")
  36. # 解析响应格式: {'code': 0, 'msg': 'success', 'data': {...}, 'success': True}
  37. if isinstance(data, dict):
  38. # 检查 code 字段
  39. code = data.get("code", 0)
  40. if code != 0:
  41. msg = data.get("msg", "未知错误")
  42. raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
  43. # 获取 data 字段
  44. result_data = data.get("data", {})
  45. if isinstance(result_data, dict):
  46. return result_data
  47. return {}
  48. return {}
  49. @tool(
  50. description="根据关键词搜索视频标题和标题的解析结果。支持传入多个关键词,返回每个关键词对应的视频列表及其解析信息。",
  51. display={
  52. "zh": {
  53. "name": "内容解析结果搜索",
  54. "params": {
  55. "keywords": "关键词列表,例如:['食用', '禁忌']",
  56. },
  57. },
  58. },
  59. )
  60. async def query_content_deconstruction_by_keywords(
  61. keywords: List[str],
  62. ) -> ToolResult:
  63. """
  64. 根据关键词搜索视频标题和标题的解析结果。
  65. Args:
  66. keywords: 关键词列表,例如:['食用', '禁忌']
  67. Returns:
  68. ToolResult: 包含每个关键词对应的视频列表及其解析结果
  69. """
  70. # 验证关键词列表
  71. if not keywords:
  72. return ToolResult(
  73. title="内容解析结果搜索失败",
  74. output="",
  75. error="关键词列表不能为空",
  76. )
  77. if not isinstance(keywords, list):
  78. return ToolResult(
  79. title="内容解析结果搜索失败",
  80. output="",
  81. error=f"关键词必须是列表类型,当前类型为: {type(keywords).__name__}",
  82. )
  83. # 过滤空字符串
  84. keywords = [kw.strip() for kw in keywords if kw and kw.strip()]
  85. if not keywords:
  86. return ToolResult(
  87. title="内容解析结果搜索失败",
  88. output="",
  89. error="关键词列表中没有有效的关键词",
  90. )
  91. try:
  92. results = await _call_content_deconstruction_api(keywords=keywords)
  93. except RuntimeError as e:
  94. return ToolResult(
  95. title="内容解析结果搜索失败",
  96. output="",
  97. error=str(e),
  98. )
  99. if not results:
  100. return ToolResult(
  101. title="内容解析结果搜索",
  102. output=json.dumps(
  103. {
  104. "message": "未找到相关内容",
  105. "keywords": keywords,
  106. },
  107. ensure_ascii=False,
  108. indent=2,
  109. ),
  110. )
  111. # 统计每个关键词的结果数量
  112. keyword_counts = {
  113. keyword: len(videos) if isinstance(videos, list) else 0
  114. for keyword, videos in results.items()
  115. }
  116. total_count = sum(keyword_counts.values())
  117. output = json.dumps(results, ensure_ascii=False, indent=2)
  118. return ToolResult(
  119. title=f"内容解析结果搜索 - {len(keywords)} 个关键词",
  120. output=output,
  121. long_term_memory=f"检索到内容解析结果,关键词: {', '.join(keywords)},共 {total_count} 条结果",
  122. )