""" ToolHub - 远程工具库集成模块 将 http://43.106.118.91:8001 的工具库 API 包装为 Agent 可调用的工具。 提供四个工具: 1. toolhub_health - 健康检查 2. toolhub_search - 搜索/发现远程工具 3. toolhub_call - 调用远程工具 4. toolhub_create - 创建新远程工具(异步) 设计要点: - toolhub_call 的 params 是动态的(取决于 tool_id),用 dict 类型兼容所有工具 - toolhub_create 是异步任务,支持轮询等待完成 - 所有接口返回 ToolResult,包含结构化输出和 long_term_memory 摘要 """ import json import time from typing import Any, Dict, Optional import httpx from agent.tools import tool, ToolResult # ── 配置 ───────────────────────────────────────────── TOOLHUB_BASE_URL = "http://43.106.118.91:8001" DEFAULT_TIMEOUT = 30.0 CREATE_POLL_TIMEOUT = 600.0 # create 最长轮询 10 分钟 CREATE_POLL_INTERVAL = 5.0 # 每 5 秒轮询一次 # ── 工具实现 ────────────────────────────────────────── @tool( display={ "zh": {"name": "ToolHub 健康检查", "params": {}}, "en": {"name": "ToolHub Health Check", "params": {}}, } ) async def toolhub_health() -> ToolResult: """检查 ToolHub 远程工具库服务是否可用 检查 ToolHub 服务的健康状态,确认服务是否正常运行。 建议在调用其他 toolhub 工具之前先检查。 Returns: ToolResult 包含服务健康状态信息 """ try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: resp = await client.get(f"{TOOLHUB_BASE_URL}/health") resp.raise_for_status() data = resp.json() return ToolResult( title="ToolHub 健康检查", output=json.dumps(data, ensure_ascii=False, indent=2), long_term_memory=f"ToolHub service at {TOOLHUB_BASE_URL} is healthy.", ) except httpx.ConnectError: return ToolResult( title="ToolHub 健康检查", output="", error=f"无法连接到 ToolHub 服务 {TOOLHUB_BASE_URL},请确认服务已启动。", ) except Exception as e: return ToolResult( title="ToolHub 健康检查", output="", error=str(e), ) @tool( display={ "zh": {"name": "搜索 ToolHub 工具", "params": {"keyword": "搜索关键词"}}, "en": {"name": "Search ToolHub", "params": {"keyword": "Search keyword"}}, } ) async def toolhub_search(keyword: Optional[str] = None) -> ToolResult: """搜索 ToolHub 远程工具库中可用的工具 从 ToolHub 工具库中搜索可用工具,返回每个工具的完整信息,包括: tool_id、名称、分类、状态、参数列表(含类型、是否必填、默认值、枚举值)、输出 schema 等。 调用 toolhub_call 之前,应先使用此工具了解目标工具的 tool_id 和所需参数。 Args: keyword: 搜索关键词,为空则返回所有工具 Returns: ToolResult 包含匹配的工具列表及其参数说明 """ try: payload = {"keyword": keyword} if keyword else {} async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: resp = await client.post( f"{TOOLHUB_BASE_URL}/search_tools", json=payload ) resp.raise_for_status() data = resp.json() total = data.get("total", 0) tools = data.get("tools", []) # 构建给 LLM 的结构化摘要 summaries = [] for t in tools: params_desc = [] for p in t.get("params", []): req = "必填" if p["required"] else "可选" desc = p.get("description", "") default_str = f", 默认={p['default']}" if p.get("default") is not None else "" enum_str = f", 可选值={p['enum']}" if p.get("enum") else "" params_desc.append( f" - {p['name']} ({p['type']}, {req}): {desc}{default_str}{enum_str}" ) tool_block = ( f"[{t['tool_id']}] {t['name']}\n" f" 状态: {t['state']}\n" f" 描述: {t.get('description', '')}\n" f" 流式: {t.get('stream_support', False)}" ) if params_desc: tool_block += "\n 参数:\n" + "\n".join(params_desc) else: tool_block += "\n 参数: 无" summaries.append(tool_block) output_text = f"共找到 {total} 个工具:\n\n" + "\n\n".join(summaries) # 附上完整 JSON 供精确引用 full_json = json.dumps(data, ensure_ascii=False, indent=2) return ToolResult( title=f"ToolHub 搜索{f': {keyword}' if keyword else ''}", output=f"{output_text}\n\n--- 完整数据 ---\n{full_json}", long_term_memory=( f"ToolHub {'搜索 ' + repr(keyword) + ' ' if keyword else ''}" f"共 {total} 个工具: " + ", ".join(t["tool_id"] for t in tools[:10]) + ("..." if total > 10 else "") ), ) except Exception as e: return ToolResult( title="搜索 ToolHub 工具失败", output="", error=str(e), ) @tool( display={ "zh": { "name": "调用 ToolHub 工具", "params": {"tool_id": "工具ID", "params": "工具参数"}, }, "en": { "name": "Call ToolHub Tool", "params": {"tool_id": "Tool ID", "params": "Tool parameters"}, }, } ) async def toolhub_call( tool_id: str, params: Optional[Dict[str, Any]] = None, ) -> ToolResult: """调用 ToolHub 远程工具库中的指定工具 通过 tool_id 调用 ToolHub 工具库中的某个工具,传入该工具所需的参数。 不同工具的参数不同,请先用 toolhub_search 查询目标工具的参数说明。 参数通过 params 字典传入,键名和类型需与工具定义一致。 例如调用图片拼接工具: tool_id="image_stitcher" params={"images": [...], "direction": "grid", "columns": 2} Args: tool_id: 要调用的工具 ID(从 toolhub_search 获取) params: 工具参数字典,键值对根据目标工具的参数定义决定 Returns: ToolResult 包含工具执行结果 """ try: payload = { "tool_id": tool_id, "params": params or {}, } async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( f"{TOOLHUB_BASE_URL}/select_tool", json=payload ) resp.raise_for_status() data = resp.json() status = data.get("status") if status == "success": result = data.get("result", {}) result_str = json.dumps(result, ensure_ascii=False, indent=2) # 如果结果中有 base64 图片,提取为 images 附件 images = [] if isinstance(result, dict) and result.get("image"): images.append({ "type": "base64", "media_type": "image/png", "data": result["image"], }) # 输出中替换超长 base64,避免占满 context result_display = {k: v for k, v in result.items() if k != "image"} result_display["image"] = f"" result_str = json.dumps(result_display, ensure_ascii=False, indent=2) return ToolResult( title=f"ToolHub [{tool_id}] 执行成功", output=result_str, long_term_memory=f"Called ToolHub tool '{tool_id}' → success", images=images, ) else: error_msg = data.get("error", "未知错误") return ToolResult( title=f"ToolHub [{tool_id}] 执行失败", output=json.dumps(data, ensure_ascii=False, indent=2), error=error_msg, ) except httpx.TimeoutException: return ToolResult( title=f"ToolHub [{tool_id}] 调用超时", output="", error=f"调用工具 {tool_id} 超时(60s),工具可能需要更长处理时间。", ) except Exception as e: return ToolResult( title=f"ToolHub [{tool_id}] 调用失败", output="", error=str(e), ) @tool( display={ "zh": { "name": "创建 ToolHub 工具", "params": {"description": "工具描述", "wait": "是否等待完成"}, }, "en": { "name": "Create ToolHub Tool", "params": {"description": "Tool description", "wait": "Wait for completion"}, }, } ) async def toolhub_create( description: str, wait: bool = True, ) -> ToolResult: """在 ToolHub 远程工具库中创建一个新工具 向 ToolHub 提交创建工具的请求。ToolHub 会根据描述自动生成工具代码、 构建运行环境并注册到工具库中。 创建过程是异步的:提交后返回 task_id,可通过轮询查看进度。 设置 wait=True(默认)会自动轮询直到完成或超时(10分钟)。 Args: description: 工具的自然语言描述,说明工具应该做什么。 例如:"创建一个简单的文本计数工具,输入文本,返回字数和字符数" wait: 是否等待任务完成。True=轮询等待结果(最长10分钟), False=仅提交并返回 task_id Returns: ToolResult 包含创建结果或任务进度信息 """ try: payload = {"description": description} async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: resp = await client.post( f"{TOOLHUB_BASE_URL}/create_tool", json=payload ) resp.raise_for_status() data = resp.json() task_id = data.get("task_id") status = data.get("status") if not task_id: return ToolResult( title="创建 ToolHub 工具失败", output=json.dumps(data, ensure_ascii=False, indent=2), error="未返回 task_id", ) # 不等待,直接返回 task_id if not wait: return ToolResult( title="ToolHub 工具创建已提交", output=f"task_id: {task_id}\nstatus: {status}", long_term_memory=f"Submitted ToolHub creation task {task_id}: {description[:80]}", ) # 轮询等待完成 start_time = time.time() async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: while time.time() - start_time < CREATE_POLL_TIMEOUT: await _async_sleep(CREATE_POLL_INTERVAL) resp = await client.get( f"{TOOLHUB_BASE_URL}/tasks/{task_id}", timeout=DEFAULT_TIMEOUT ) task = resp.json() task_status = task.get("status") if task_status == "completed": result = task.get("result", "") return ToolResult( title="ToolHub 工具创建完成", output=( f"task_id: {task_id}\n" f"result: {str(result)[:500]}\n\n" f"耗时: {time.time() - start_time:.0f}s\n" f"可使用 toolhub_search 查看新工具。" ), long_term_memory=f"ToolHub tool created (task {task_id}): {description[:60]}", ) if task_status == "failed": error = task.get("error", "unknown") return ToolResult( title="ToolHub 工具创建失败", output=json.dumps(task, ensure_ascii=False, indent=2), error=f"Task {task_id} failed: {error}", ) # 超时 elapsed = time.time() - start_time return ToolResult( title="ToolHub 工具创建超时", output=f"task_id: {task_id}\n轮询 {elapsed:.0f}s 后仍未完成。", error=f"Task {task_id} 未在 {CREATE_POLL_TIMEOUT:.0f}s 内完成,请稍后查询。", ) except Exception as e: return ToolResult( title="创建 ToolHub 工具失败", output="", error=str(e), ) # ── 辅助函数 ───────────────────────────────────────── async def _async_sleep(seconds: float): """异步 sleep""" import asyncio await asyncio.sleep(seconds)