''' 工具库模块,提供工具库中的工具调用、保存新的待接入工具的信息 分为三个函数: 1. 调用工具库中的工具 2. 保存新的待接入工具的信息 3. 获取工具调用参数信息 ''' import requests import os import json import re TOOL_SERVER_URL = "http://47.84.182.56:8001/tools/call" def call_tool(tool_name: str, arguments: dict): """ 调用工具库中的工具 :param tool_name: 工具名称 :param arguments: 工具参数字典 :return: 工具调用结果 """ url = f"{TOOL_SERVER_URL}/{tool_name}" headers = { 'Content-Type': 'application/json' } try: response = requests.post(url, headers=headers, json=arguments) response.raise_for_status() return response.json() except requests.RequestException as e: # 在实际生产中可能需要更复杂的错误处理或日志记录 return {"error": f"Failed to call tool {tool_name}: {str(e)}"} def save_tool_info(tool_name: str, tool_doc: str): """ 保存新的待接入工具的信息 :param tool_name: 工具名称 :param tool_doc: 工具文档字符串 :return: 保存的文件路径 """ # 获取当前文件所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 创建 tool_infos 目录(如果不存在) save_dir = os.path.join(current_dir, 'tool_infos') if not os.path.exists(save_dir): os.makedirs(save_dir) file_path = os.path.join(save_dir, f"{tool_name}.txt") try: with open(file_path, 'w', encoding='utf-8') as f: f.write(tool_doc) return file_path except IOError as e: return f"Error saving tool info: {str(e)}" def get_all_tool_infos() -> str: """ 获取所有已保存工具的信息 :return: 所有工具信息的拼接字符串 """ current_dir = os.path.dirname(os.path.abspath(__file__)) save_dir = os.path.join(current_dir, 'tool_infos') if not os.path.exists(save_dir): return "" tool_infos = [] try: for filename in os.listdir(save_dir): if filename.endswith(".txt"): file_path = os.path.join(save_dir, filename) with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() tool_infos.append(f"--- Tool: {filename[:-4]} ---\n{content}\n") except Exception as e: return f"Error reading tool infos: {str(e)}" return "\n".join(tool_infos) def get_tool_info(tool_name: str) -> str: """ 获取指定工具的信息 :param tool_name: 工具名称 :return: 工具信息文本,如果不存在返回空字符串 """ current_dir = os.path.dirname(os.path.abspath(__file__)) save_dir = os.path.join(current_dir, 'tool_infos') if not os.path.exists(save_dir): return "" file_path = os.path.join(save_dir, f"{tool_name}.txt") if not os.path.exists(file_path): return "" try: with open(file_path, 'r', encoding='utf-8') as f: return f.read().strip() except Exception as e: return f"Error reading tool info: {str(e)}" def get_tool_params(tools_id: str) -> str: """ 根据tools_id获取工具调用参数信息 :param tools_id: 工具调用ID :return: 工具参数信息的JSON字符串,如果未找到返回空字符串 """ current_dir = os.path.dirname(os.path.abspath(__file__)) prompt_dir = os.path.join(current_dir, 'prompt') params_file = os.path.join(prompt_dir, 'all_tools_params.md') if not os.path.exists(params_file): return "" try: with open(params_file, 'r', encoding='utf-8') as f: content = f.read() # 查找工具调用ID的位置 # 格式:工具调用ID: id_pattern = f"工具调用ID:{re.escape(tools_id)}" match = re.search(id_pattern, content) if not match: return "" # 从匹配位置开始往后找JSON块 start_index = match.end() json_start = content.find('{', start_index) if json_start == -1: return "" # 简单的JSON提取逻辑:找匹配的大括号 brace_count = 0 json_end = -1 for i in range(json_start, len(content)): if content[i] == '{': brace_count += 1 elif content[i] == '}': brace_count -= 1 if brace_count == 0: json_end = i + 1 break if json_end != -1: return content[json_start:json_end] return "" except Exception as e: return f"Error reading tool params: {str(e)}"