import json import logging import time from pathlib import Path from tool_agent.models import ToolMeta, ToolStatus from tool_agent.tool.tool_store import PostgreSQLToolStore logging.basicConfig(level=logging.INFO, format='%(message)s') def migrate(): print("=" * 50) print(" 开始迁移数据:registry.json -> PostgreSQL Tool Store") print("=" * 50) try: store = PostgreSQLToolStore() except Exception as e: print(f"Failed to connect to database: {e}") return # Check for legacy groups groups_path = Path("data/groups.json") group_map = {} if groups_path.exists(): try: gdata = json.loads(groups_path.read_text(encoding="utf-8")) for g in gdata.get("groups", []): # Clean up group names if they represent providers # e.g. runcomfy_lifecycle -> runcomfy g_id = g.get("group_id", "") provider_name = g_id.replace("_lifecycle", "").replace("_task", "") group_map[g_id] = provider_name # Create tools mapping based on old groupings for t_id in g.get("tool_ids", []): # We'll use this if registry doesn't have it directly populated pass except Exception: pass registry_path = Path("data/registry.json") if not registry_path.exists(): print("找不到 data/registry.json 文件,无法迁移。") return data = json.loads(registry_path.read_text(encoding="utf-8")) tools_migrated = 0 for item in data.get("tools", []): try: tool = ToolMeta(**item) # Map ToolStatus to Postgres string status_str = "已接入" if tool.status == ToolStatus.ACTIVE else "未接入" # Legacy groups serve as providers now provider_ids = [] for gid in tool.group_ids: mapped_pid = group_map.get(gid, gid) if mapped_pid not in provider_ids: provider_ids.append(mapped_pid) # 兼容性:自动推断一些已知的裸工具 Provider if not provider_ids: if "liblib" in tool.tool_id.lower(): provider_ids.append("liblib") elif "flux" in tool.tool_id.lower(): provider_ids.append("flux") tool_dict = { "id": tool.tool_id, "name": tool.name, "version": "1.0.0", "introduction": tool.description, "tutorial": "", "input": tool.input_schema, "output": tool.output_schema, "updated_time": int(time.time()), "status": status_str, "provider_ids": provider_ids, } store.insert_or_update(tool_dict) print(f"[SUCCESS] 迁移工具: {tool.tool_id} (Provider: {provider_ids})") tools_migrated += 1 except Exception as e: print(f"[ERROR] 工具 {item.get('tool_id')} 迁移失败: {e}") store.close() print("=" * 50) print(f" 迁移完成!共成功处理 {tools_migrated} 个工具。") print(" 现在的 tool_provider 关系表已经建立,可直接从数据库加载!") print("=" * 50) if __name__ == "__main__": migrate()