#!/usr/bin/env python3 import asyncio import json import logging import argparse from psycopg2.extras import RealDictCursor # 导入应用内的模块 from tool_agent.tool.tool_store import PostgreSQLToolStore from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient, AssistantMessage, TextBlock logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') logger = logging.getLogger(__name__) SYSTEM_PROMPT = """你是一个专业的 API 工具库去重专家。 你的任务是将一份【未接入工具列表】与一份核心的【已接入工具列表】进行比对归并。 如果某一个【未接入工具】(哪怕换了名字叫别的)在【已接入工具】中已经有同样的本质功能,请找出这种映射关系。 例如: 未接入:"通过 Midjourney 生成图片" 或 "Midjourney 查进度" 已接入:由于我们有 `midjourney_submit_job` 和 `midjourney_query_job_status`,请寻找最合理的对应。 【重要领域等价字典】请你在判定时,严格遵从以下我们架构中的内置别名等价逻辑: 1. "Nano Banana" 或者 "nanobanana" 等同于 "Google Gemini 生图" 或 "Imagen 3" 2. "BFL" 等同于 "Flux 官方生图 API" 3. "LiblibAI" 或 "哩布哩布" 等同于任何有关 Controlnet (不论是 OpenPose/Canny 等变体) 的支持 4. "RunComfy" 等同于 "ComfyUI 远程实例或工作流执行" 5. "即梦" 或 "ji_meng" 同等对应其系列任务 如果未能找到合理的已接入工具应对,则不输出映射。 请输出一个合法的 JSON 数组,必须是一个 list of objects: [ { "unconnected_id": "<未接入的工具id>", "connected_id": "<对应最合理的已接入的工具id>", "reason": "<非常简短的一句话匹配理由>" } ] 请绝对不要包含任何 Markdown 语法、```json 标签,直接输出原生 JSON 字符串。 """ async def match_tools_with_claude(connected_tools, unconnected_batch): conn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in connected_tools], ensure_ascii=False) unconn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in unconnected_batch], ensure_ascii=False) prompt = f"【已接入工具库(作为对标参考)】:\n{conn_str}\n\n【本次需判定匹配的未接入工具】:\n{unconn_str}\n\n请输出JSON结果:" options = ClaudeAgentOptions(model="claude-sonnet-4-5", system_prompt=SYSTEM_PROMPT) # 建立客户端调用 result_text = "" try: async with ClaudeSDKClient(options=options) as client: await client.query(prompt) async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: if isinstance(block, TextBlock): result_text += block.text except Exception as e: logger.error(f"Claude API failed: {e}") return [] # 毫无保留地保存最原始的字符串落盘(防崩备用方案) with open("raw_claude_responses.log", "a", encoding="utf-8") as f: f.write(f"\n--- Batch Output ---\n{result_text}\n") # 尝试解析 JSON try: # 去掉可能的 Markdown 包装 clean_json = result_text.strip() if clean_json.startswith("```json"): clean_json = clean_json[7:] elif clean_json.startswith("```"): clean_json = clean_json[3:] if clean_json.endswith("```"): clean_json = clean_json[:-3] data = json.loads(clean_json.strip()) if isinstance(data, dict): if "unconnected_id" in data: return [data] return [] elif isinstance(data, list): return [item for item in data if isinstance(item, dict)] return [] except Exception as e: logger.error(f"Failed to parse JSON response: {e}\nResponse was: {result_text[:200]}") return [] def apply_database_merge(store: PostgreSQLToolStore, match_plan: list): """根据生成的合法匹配图,执行数据库替换""" if not match_plan: logger.info("没有任何匹配数据可执行落地!") return c = store.conn.cursor(cursor_factory=RealDictCursor) merged_count = 0 for match in match_plan: uid = match.get("unconnected_id") cid = match.get("connected_id") if not uid or not cid: continue logger.info(f"==> 执行合并映射 [未接入: {uid} -> 已接入: {cid}]") try: # 1. 查询出原未接入工具的特征介绍准备拼接 c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (uid,)) u_tool = c.fetchone() c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (cid,)) c_tool = c.fetchone() if u_tool and c_tool: u_ver = u_tool.get('version') or '' c_ver = c_tool.get('version') or '' # Extract best version, favor c_ver if exists and not placeholder latest_ver = c_ver if c_ver and c_ver != "1.0.0" else (u_ver if u_ver else "1.0.0") version_prefix = f"已支持的最新版本: {latest_ver}\n\n" new_intro = version_prefix + (c_tool['introduction'] or '') + "\n\n[融合补充描述]:\n" + (u_tool['introduction'] or '') new_tutorial = (c_tool['tutorial'] or '') + "\n\n" + (u_tool['tutorial'] or '') c.execute("UPDATE tool SET introduction=%s, tutorial=%s WHERE id=%s", (new_intro[:4000], new_tutorial[:4000], cid)) # 2. 跟原表里外键绑定的迁移过去(只搬迁关系,利用 ON CONFLICT 防止主键重复) tables_to_migrate = [ ("capability_tool", "capability_id", "tool_id"), ("tool_knowledge", "knowledge_id", "tool_id"), ("tool_provider", "provider_id", "tool_id") ] for table, _, tool_col in tables_to_migrate: # 找出旧工具的所有关联 c.execute(f"SELECT * FROM {table} WHERE {tool_col} = %s", (uid,)) rows = c.fetchall() for row in rows: if table == "capability_tool": c.execute("INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['capability_id'], cid)) elif table == "tool_knowledge": c.execute("INSERT INTO tool_knowledge (knowledge_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['knowledge_id'], cid)) elif table == "tool_provider": c.execute("INSERT INTO tool_provider (provider_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['provider_id'], cid)) # 3. 将原未接入工具删除 (cascade delete if properly constructed by DB, else manually delete relations first) # 既然我们已经读了并插入到了新的关联里,我们需要手动删除旧的外键绑定好让 CASCADE 清晰或者直接删 c.execute("DELETE FROM capability_tool WHERE tool_id = %s", (uid,)) c.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (uid,)) c.execute("DELETE FROM tool_provider WHERE tool_id = %s", (uid,)) c.execute("DELETE FROM tool WHERE id = %s", (uid,)) merged_count += 1 except Exception as e: logger.error(f"处理 {uid} 的融合时出错: {e}") store.conn.rollback() # 出错则回滚该条 continue # 单条合并成功可提交 store.conn.commit() c.close() logger.info(f"所有指定合并执行完毕,已彻底抹除融合了 {merged_count} 个废弃节点工具!") async def run_workflow(apply=False): store = PostgreSQLToolStore() tools = store.list_all(limit=2000) connected = [t for t in tools if t.get("status") == "已接入"] unconnected = [t for t in tools if t.get("status") != "已接入"] logger.info(f"提取出 {len(connected)} 个基础已接入工具,以及 {len(unconnected)} 个未接入待判决工具。") if not unconnected: logger.info("没有任何未接入工具需要处理。") store.close() return # 尝试从本地缓存文件读取,避免重复消耗大模型调用 import os match_file = "matches.json" if os.path.exists(match_file): logger.info(f"发现本地存在的缓存文件 {match_file},将直接读取现有分析结果!") with open(match_file, "r", encoding="utf-8") as f: all_matches = json.load(f) else: # 切割 batch,降低大模型的单次信息处理压力,提升专注度 batch_size = 10 all_matches = [] total_batches = (len(unconnected) // batch_size) + (1 if len(unconnected) % batch_size != 0 else 0) for i in range(0, len(unconnected), batch_size): batch = unconnected[i:i + batch_size] logger.info(f"正在交给 Claude 引擎评估第 {i//batch_size + 1}/{total_batches} 批 ({len(batch)} items) ...") matches = await match_tools_with_claude(connected, batch) if matches: all_matches.extend(matches) # 缓存映射结果 with open(match_file, "w", encoding="utf-8") as f: json.dump(all_matches, f, ensure_ascii=False, indent=2) logger.info(f"\n====================== 预览报告 ======================") if not all_matches: logger.info("Claude 没有找到任何可以安全结合的工具!") else: logger.info(f"Claude 匹配上了 {len(all_matches)} 个工具,建议的更换路线如下:") for m in all_matches: logger.info(f" - [冗余废除] {m.get('unconnected_id')} ==> [主工具] {m.get('connected_id')}") logger.info(f" 理由: {m.get('reason','')}") logger.info(f"====================================================\n") if apply: logger.info("启动真实的数据库落地置换 (Apply mode is ON)...") apply_database_merge(store, all_matches) else: logger.info("目前为 Dry-Run 模式,如果审查匹配无误觉得干的漂亮,请携带 --apply 开关挂载实际融合执行!") store.close() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--apply", action="store_true", help="Apply DB migrations physically") args = parser.parse_args() asyncio.run(run_workflow(apply=args.apply))