topic_search.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """
  2. 选题检索工具 - 根据关键词在数据库中匹配已有帖子的选题
  3. 用于 Agent 执行时自主调取参考数据。
  4. """
  5. import json
  6. import os
  7. from typing import Any, Dict, List, Optional
  8. import httpx
  9. from agent.tools import tool, ToolResult
  10. # 选题检索 API 配置
  11. TOPIC_SEARCH_BASE_URL = os.getenv("TOPIC_SEARCH_BASE_URL", "http://192.168.81.89:8000")
  12. DEFAULT_TIMEOUT = 30.0
  13. async def _call_search_api(keywords: List[str]) -> Optional[List[Dict[str, Any]]]:
  14. """调用选题检索 API,返回结果列表。"""
  15. url = f"{TOPIC_SEARCH_BASE_URL.rstrip('/')}/search"
  16. payload = {"keywords": keywords}
  17. try:
  18. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  19. resp = await client.post(url, json=payload)
  20. resp.raise_for_status()
  21. data = resp.json()
  22. except httpx.HTTPStatusError as e:
  23. raise RuntimeError(f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}")
  24. except Exception as e:
  25. raise RuntimeError(f"请求异常: {str(e)}")
  26. # 兼容多种响应格式
  27. if isinstance(data, list):
  28. return data[:5]
  29. if isinstance(data, dict):
  30. items = data.get("data") or data.get("results") or data.get("items") or []
  31. return list(items)[:5] if isinstance(items, (list, tuple)) else []
  32. return []
  33. def _pick_first(results: List[Dict[str, Any]]) -> Dict[str, Any]:
  34. """从结果中取第一条。"""
  35. if not results:
  36. raise ValueError("无可用结果")
  37. return results[0]
  38. @tool(
  39. description="根据关键词在数据库中检索已有帖子的选题,用于创作参考。最多返回5条,取第一条输出。",
  40. display={
  41. "zh": {
  42. "name": "爆款选题检索",
  43. "params": {
  44. "keywords": "关键词列表",
  45. },
  46. },
  47. },
  48. )
  49. async def topic_search(keywords: List[str]) -> ToolResult:
  50. """
  51. 根据关键词检索数据库中已有帖子的选题,取第一条作为参考。
  52. Args:
  53. keywords: 关键词列表,如 ["中老年健康养生", "爆款", "知识科普"]
  54. Returns:
  55. ToolResult: 选题参考内容
  56. """
  57. if not keywords:
  58. return ToolResult(
  59. title="选题检索失败",
  60. output="",
  61. error="请提供至少一个关键词",
  62. )
  63. try:
  64. results = await _call_search_api(keywords)
  65. except RuntimeError as e:
  66. return ToolResult(
  67. title="选题检索失败",
  68. output="",
  69. error=str(e),
  70. )
  71. if not results:
  72. return ToolResult(
  73. title="选题检索",
  74. output=json.dumps({"message": "未找到匹配的选题", "keywords": keywords}, ensure_ascii=False, indent=2),
  75. )
  76. try:
  77. best = _pick_first(results)
  78. except ValueError:
  79. return ToolResult(
  80. title="选题检索",
  81. output=json.dumps({"message": "无可用结果", "keywords": keywords}, ensure_ascii=False, indent=2),
  82. )
  83. output = json.dumps(best, ensure_ascii=False, indent=2)
  84. return ToolResult(
  85. title="选题检索 - 参考数据",
  86. output=output,
  87. long_term_memory=f"检索到选题参考,关键词: {', '.join(keywords)}",
  88. )