| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- """
- ToolHub - 远程工具库集成模块
- 将 http://43.106.118.91:8001 的工具库 API 包装为 Agent 可调用的工具。
- 提供四个工具:
- 1. toolhub_health - 健康检查
- 2. toolhub_search - 搜索/发现远程工具
- 3. toolhub_call - 调用远程工具
- 4. toolhub_create - 创建新远程工具(异步)
- 设计要点:
- - toolhub_call 的 params 是动态的(取决于 tool_id),用 dict 类型兼容所有工具
- - toolhub_create 是异步任务,支持轮询等待完成
- - 所有接口返回 ToolResult,包含结构化输出和 long_term_memory 摘要
- """
- import json
- import time
- from typing import Any, Dict, Optional
- import httpx
- from agent.tools import tool, ToolResult
- # ── 配置 ─────────────────────────────────────────────
- TOOLHUB_BASE_URL = "http://43.106.118.91:8001"
- DEFAULT_TIMEOUT = 30.0
- CREATE_POLL_TIMEOUT = 600.0 # create 最长轮询 10 分钟
- CREATE_POLL_INTERVAL = 5.0 # 每 5 秒轮询一次
- # ── 工具实现 ──────────────────────────────────────────
- @tool(
- display={
- "zh": {"name": "ToolHub 健康检查", "params": {}},
- "en": {"name": "ToolHub Health Check", "params": {}},
- }
- )
- async def toolhub_health() -> ToolResult:
- """检查 ToolHub 远程工具库服务是否可用
- 检查 ToolHub 服务的健康状态,确认服务是否正常运行。
- 建议在调用其他 toolhub 工具之前先检查。
- Returns:
- ToolResult 包含服务健康状态信息
- """
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.get(f"{TOOLHUB_BASE_URL}/health")
- resp.raise_for_status()
- data = resp.json()
- return ToolResult(
- title="ToolHub 健康检查",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- long_term_memory=f"ToolHub service at {TOOLHUB_BASE_URL} is healthy.",
- )
- except httpx.ConnectError:
- return ToolResult(
- title="ToolHub 健康检查",
- output="",
- error=f"无法连接到 ToolHub 服务 {TOOLHUB_BASE_URL},请确认服务已启动。",
- )
- except Exception as e:
- return ToolResult(
- title="ToolHub 健康检查",
- output="",
- error=str(e),
- )
- @tool(
- display={
- "zh": {"name": "搜索 ToolHub 工具", "params": {"keyword": "搜索关键词"}},
- "en": {"name": "Search ToolHub", "params": {"keyword": "Search keyword"}},
- }
- )
- async def toolhub_search(keyword: Optional[str] = None) -> ToolResult:
- """搜索 ToolHub 远程工具库中可用的工具
- 从 ToolHub 工具库中搜索可用工具,返回每个工具的完整信息,包括:
- tool_id、名称、分类、状态、参数列表(含类型、是否必填、默认值、枚举值)、输出 schema 等。
- 调用 toolhub_call 之前,应先使用此工具了解目标工具的 tool_id 和所需参数。
- Args:
- keyword: 搜索关键词,为空则返回所有工具
- Returns:
- ToolResult 包含匹配的工具列表及其参数说明
- """
- try:
- payload = {"keyword": keyword} if keyword else {}
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- f"{TOOLHUB_BASE_URL}/search_tools", json=payload
- )
- resp.raise_for_status()
- data = resp.json()
- total = data.get("total", 0)
- tools = data.get("tools", [])
- # 构建给 LLM 的结构化摘要
- summaries = []
- for t in tools:
- params_desc = []
- for p in t.get("params", []):
- req = "必填" if p["required"] else "可选"
- desc = p.get("description", "")
- default_str = f", 默认={p['default']}" if p.get("default") is not None else ""
- enum_str = f", 可选值={p['enum']}" if p.get("enum") else ""
- params_desc.append(
- f" - {p['name']} ({p['type']}, {req}): {desc}{default_str}{enum_str}"
- )
- tool_block = (
- f"[{t['tool_id']}] {t['name']}\n"
- f" 状态: {t['state']}\n"
- f" 描述: {t.get('description', '')}\n"
- f" 流式: {t.get('stream_support', False)}"
- )
- if params_desc:
- tool_block += "\n 参数:\n" + "\n".join(params_desc)
- else:
- tool_block += "\n 参数: 无"
- summaries.append(tool_block)
- output_text = f"共找到 {total} 个工具:\n\n" + "\n\n".join(summaries)
- # 附上完整 JSON 供精确引用
- full_json = json.dumps(data, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"ToolHub 搜索{f': {keyword}' if keyword else ''}",
- output=f"{output_text}\n\n--- 完整数据 ---\n{full_json}",
- long_term_memory=(
- f"ToolHub {'搜索 ' + repr(keyword) + ' ' if keyword else ''}"
- f"共 {total} 个工具: "
- + ", ".join(t["tool_id"] for t in tools[:10])
- + ("..." if total > 10 else "")
- ),
- )
- except Exception as e:
- return ToolResult(
- title="搜索 ToolHub 工具失败",
- output="",
- error=str(e),
- )
- @tool(
- display={
- "zh": {
- "name": "调用 ToolHub 工具",
- "params": {"tool_id": "工具ID", "params": "工具参数"},
- },
- "en": {
- "name": "Call ToolHub Tool",
- "params": {"tool_id": "Tool ID", "params": "Tool parameters"},
- },
- }
- )
- async def toolhub_call(
- tool_id: str,
- params: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- """调用 ToolHub 远程工具库中的指定工具
- 通过 tool_id 调用 ToolHub 工具库中的某个工具,传入该工具所需的参数。
- 不同工具的参数不同,请先用 toolhub_search 查询目标工具的参数说明。
- 参数通过 params 字典传入,键名和类型需与工具定义一致。
- 例如调用图片拼接工具:
- tool_id="image_stitcher"
- params={"images": [...], "direction": "grid", "columns": 2}
- Args:
- tool_id: 要调用的工具 ID(从 toolhub_search 获取)
- params: 工具参数字典,键值对根据目标工具的参数定义决定
- Returns:
- ToolResult 包含工具执行结果
- """
- try:
- payload = {
- "tool_id": tool_id,
- "params": params or {},
- }
- async with httpx.AsyncClient(timeout=60.0) as client:
- resp = await client.post(
- f"{TOOLHUB_BASE_URL}/select_tool", json=payload
- )
- resp.raise_for_status()
- data = resp.json()
- status = data.get("status")
- if status == "success":
- result = data.get("result", {})
- result_str = json.dumps(result, ensure_ascii=False, indent=2)
- # 如果结果中有 base64 图片,提取为 images 附件
- images = []
- if isinstance(result, dict) and result.get("image"):
- images.append({
- "type": "base64",
- "media_type": "image/png",
- "data": result["image"],
- })
- # 输出中替换超长 base64,避免占满 context
- result_display = {k: v for k, v in result.items() if k != "image"}
- result_display["image"] = f"<base64 image, {len(result['image'])} chars>"
- result_str = json.dumps(result_display, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"ToolHub [{tool_id}] 执行成功",
- output=result_str,
- long_term_memory=f"Called ToolHub tool '{tool_id}' → success",
- images=images,
- )
- else:
- error_msg = data.get("error", "未知错误")
- return ToolResult(
- title=f"ToolHub [{tool_id}] 执行失败",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- error=error_msg,
- )
- except httpx.TimeoutException:
- return ToolResult(
- title=f"ToolHub [{tool_id}] 调用超时",
- output="",
- error=f"调用工具 {tool_id} 超时(60s),工具可能需要更长处理时间。",
- )
- except Exception as e:
- return ToolResult(
- title=f"ToolHub [{tool_id}] 调用失败",
- output="",
- error=str(e),
- )
- @tool(
- display={
- "zh": {
- "name": "创建 ToolHub 工具",
- "params": {"description": "工具描述", "wait": "是否等待完成"},
- },
- "en": {
- "name": "Create ToolHub Tool",
- "params": {"description": "Tool description", "wait": "Wait for completion"},
- },
- }
- )
- async def toolhub_create(
- description: str,
- wait: bool = True,
- ) -> ToolResult:
- """在 ToolHub 远程工具库中创建一个新工具
- 向 ToolHub 提交创建工具的请求。ToolHub 会根据描述自动生成工具代码、
- 构建运行环境并注册到工具库中。
- 创建过程是异步的:提交后返回 task_id,可通过轮询查看进度。
- 设置 wait=True(默认)会自动轮询直到完成或超时(10分钟)。
- Args:
- description: 工具的自然语言描述,说明工具应该做什么。
- 例如:"创建一个简单的文本计数工具,输入文本,返回字数和字符数"
- wait: 是否等待任务完成。True=轮询等待结果(最长10分钟),
- False=仅提交并返回 task_id
- Returns:
- ToolResult 包含创建结果或任务进度信息
- """
- try:
- payload = {"description": description}
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- f"{TOOLHUB_BASE_URL}/create_tool", json=payload
- )
- resp.raise_for_status()
- data = resp.json()
- task_id = data.get("task_id")
- status = data.get("status")
- if not task_id:
- return ToolResult(
- title="创建 ToolHub 工具失败",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- error="未返回 task_id",
- )
- # 不等待,直接返回 task_id
- if not wait:
- return ToolResult(
- title="ToolHub 工具创建已提交",
- output=f"task_id: {task_id}\nstatus: {status}",
- long_term_memory=f"Submitted ToolHub creation task {task_id}: {description[:80]}",
- )
- # 轮询等待完成
- start_time = time.time()
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- while time.time() - start_time < CREATE_POLL_TIMEOUT:
- await _async_sleep(CREATE_POLL_INTERVAL)
- resp = await client.get(
- f"{TOOLHUB_BASE_URL}/tasks/{task_id}", timeout=DEFAULT_TIMEOUT
- )
- task = resp.json()
- task_status = task.get("status")
- if task_status == "completed":
- result = task.get("result", "")
- return ToolResult(
- title="ToolHub 工具创建完成",
- output=(
- f"task_id: {task_id}\n"
- f"result: {str(result)[:500]}\n\n"
- f"耗时: {time.time() - start_time:.0f}s\n"
- f"可使用 toolhub_search 查看新工具。"
- ),
- long_term_memory=f"ToolHub tool created (task {task_id}): {description[:60]}",
- )
- if task_status == "failed":
- error = task.get("error", "unknown")
- return ToolResult(
- title="ToolHub 工具创建失败",
- output=json.dumps(task, ensure_ascii=False, indent=2),
- error=f"Task {task_id} failed: {error}",
- )
- # 超时
- elapsed = time.time() - start_time
- return ToolResult(
- title="ToolHub 工具创建超时",
- output=f"task_id: {task_id}\n轮询 {elapsed:.0f}s 后仍未完成。",
- error=f"Task {task_id} 未在 {CREATE_POLL_TIMEOUT:.0f}s 内完成,请稍后查询。",
- )
- except Exception as e:
- return ToolResult(
- title="创建 ToolHub 工具失败",
- output="",
- error=str(e),
- )
- # ── 辅助函数 ─────────────────────────────────────────
- async def _async_sleep(seconds: float):
- """异步 sleep"""
- import asyncio
- await asyncio.sleep(seconds)
|