| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- #!/usr/bin/env python3
- import asyncio
- import json
- import logging
- import argparse
- from psycopg2.extras import RealDictCursor
- # 导入应用内的模块
- from tool_agent.tool.tool_store import PostgreSQLToolStore
- from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient, AssistantMessage, TextBlock
- logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
- logger = logging.getLogger(__name__)
- SYSTEM_PROMPT = """你是一个专业的 API 工具库去重专家。
- 你的任务是将一份【未接入工具列表】与一份核心的【已接入工具列表】进行比对归并。
- 如果某一个【未接入工具】(哪怕换了名字叫别的)在【已接入工具】中已经有同样的本质功能,请找出这种映射关系。
- 例如:
- 未接入:"通过 Midjourney 生成图片" 或 "Midjourney 查进度"
- 已接入:由于我们有 `midjourney_submit_job` 和 `midjourney_query_job_status`,请寻找最合理的对应。
- 【重要领域等价字典】请你在判定时,严格遵从以下我们架构中的内置别名等价逻辑:
- 1. "Nano Banana" 或者 "nanobanana" 等同于 "Google Gemini 生图" 或 "Imagen 3"
- 2. "BFL" 等同于 "Flux 官方生图 API"
- 3. "LiblibAI" 或 "哩布哩布" 等同于任何有关 Controlnet (不论是 OpenPose/Canny 等变体) 的支持
- 4. "RunComfy" 等同于 "ComfyUI 远程实例或工作流执行"
- 5. "即梦" 或 "ji_meng" 同等对应其系列任务
- 如果未能找到合理的已接入工具应对,则不输出映射。
- 请输出一个合法的 JSON 数组,必须是一个 list of objects:
- [
- {
- "unconnected_id": "<未接入的工具id>",
- "connected_id": "<对应最合理的已接入的工具id>",
- "reason": "<非常简短的一句话匹配理由>"
- }
- ]
- 请绝对不要包含任何 Markdown 语法、```json 标签,直接输出原生 JSON 字符串。
- """
- async def match_tools_with_claude(connected_tools, unconnected_batch):
- conn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in connected_tools], ensure_ascii=False)
- unconn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in unconnected_batch], ensure_ascii=False)
-
- prompt = f"【已接入工具库(作为对标参考)】:\n{conn_str}\n\n【本次需判定匹配的未接入工具】:\n{unconn_str}\n\n请输出JSON结果:"
-
- options = ClaudeAgentOptions(model="claude-sonnet-4-5", system_prompt=SYSTEM_PROMPT)
-
- # 建立客户端调用
- result_text = ""
- try:
- async with ClaudeSDKClient(options=options) as client:
- await client.query(prompt)
- async for msg in client.receive_response():
- if isinstance(msg, AssistantMessage):
- for block in msg.content:
- if isinstance(block, TextBlock):
- result_text += block.text
- except Exception as e:
- logger.error(f"Claude API failed: {e}")
- return []
- # 毫无保留地保存最原始的字符串落盘(防崩备用方案)
- with open("raw_claude_responses.log", "a", encoding="utf-8") as f:
- f.write(f"\n--- Batch Output ---\n{result_text}\n")
- # 尝试解析 JSON
- try:
- # 去掉可能的 Markdown 包装
- clean_json = result_text.strip()
- if clean_json.startswith("```json"):
- clean_json = clean_json[7:]
- elif clean_json.startswith("```"):
- clean_json = clean_json[3:]
- if clean_json.endswith("```"):
- clean_json = clean_json[:-3]
-
- data = json.loads(clean_json.strip())
- if isinstance(data, dict):
- if "unconnected_id" in data:
- return [data]
- return []
- elif isinstance(data, list):
- return [item for item in data if isinstance(item, dict)]
- return []
- except Exception as e:
- logger.error(f"Failed to parse JSON response: {e}\nResponse was: {result_text[:200]}")
- return []
- def apply_database_merge(store: PostgreSQLToolStore, match_plan: list):
- """根据生成的合法匹配图,执行数据库替换"""
- if not match_plan:
- logger.info("没有任何匹配数据可执行落地!")
- return
-
- c = store.conn.cursor(cursor_factory=RealDictCursor)
-
- merged_count = 0
- for match in match_plan:
- uid = match.get("unconnected_id")
- cid = match.get("connected_id")
-
- if not uid or not cid:
- continue
-
- logger.info(f"==> 执行合并映射 [未接入: {uid} -> 已接入: {cid}]")
-
- try:
- # 1. 查询出原未接入工具的特征介绍准备拼接
- c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (uid,))
- u_tool = c.fetchone()
-
- c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (cid,))
- c_tool = c.fetchone()
-
- if u_tool and c_tool:
- u_ver = u_tool.get('version') or ''
- c_ver = c_tool.get('version') or ''
- # Extract best version, favor c_ver if exists and not placeholder
- latest_ver = c_ver if c_ver and c_ver != "1.0.0" else (u_ver if u_ver else "1.0.0")
-
- version_prefix = f"已支持的最新版本: {latest_ver}\n\n"
-
- new_intro = version_prefix + (c_tool['introduction'] or '') + "\n\n[融合补充描述]:\n" + (u_tool['introduction'] or '')
- new_tutorial = (c_tool['tutorial'] or '') + "\n\n" + (u_tool['tutorial'] or '')
- c.execute("UPDATE tool SET introduction=%s, tutorial=%s WHERE id=%s", (new_intro[:4000], new_tutorial[:4000], cid))
-
- # 2. 跟原表里外键绑定的迁移过去(只搬迁关系,利用 ON CONFLICT 防止主键重复)
- tables_to_migrate = [
- ("capability_tool", "capability_id", "tool_id"),
- ("tool_knowledge", "knowledge_id", "tool_id"),
- ("tool_provider", "provider_id", "tool_id")
- ]
-
- for table, _, tool_col in tables_to_migrate:
- # 找出旧工具的所有关联
- c.execute(f"SELECT * FROM {table} WHERE {tool_col} = %s", (uid,))
- rows = c.fetchall()
- for row in rows:
- if table == "capability_tool":
- c.execute("INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['capability_id'], cid))
- elif table == "tool_knowledge":
- c.execute("INSERT INTO tool_knowledge (knowledge_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['knowledge_id'], cid))
- elif table == "tool_provider":
- c.execute("INSERT INTO tool_provider (provider_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['provider_id'], cid))
-
- # 3. 将原未接入工具删除 (cascade delete if properly constructed by DB, else manually delete relations first)
- # 既然我们已经读了并插入到了新的关联里,我们需要手动删除旧的外键绑定好让 CASCADE 清晰或者直接删
- c.execute("DELETE FROM capability_tool WHERE tool_id = %s", (uid,))
- c.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (uid,))
- c.execute("DELETE FROM tool_provider WHERE tool_id = %s", (uid,))
- c.execute("DELETE FROM tool WHERE id = %s", (uid,))
-
- merged_count += 1
- except Exception as e:
- logger.error(f"处理 {uid} 的融合时出错: {e}")
- store.conn.rollback() # 出错则回滚该条
- continue
-
- # 单条合并成功可提交
- store.conn.commit()
-
- c.close()
- logger.info(f"所有指定合并执行完毕,已彻底抹除融合了 {merged_count} 个废弃节点工具!")
- async def run_workflow(apply=False):
- store = PostgreSQLToolStore()
- tools = store.list_all(limit=2000)
-
- connected = [t for t in tools if t.get("status") == "已接入"]
- unconnected = [t for t in tools if t.get("status") != "已接入"]
-
- logger.info(f"提取出 {len(connected)} 个基础已接入工具,以及 {len(unconnected)} 个未接入待判决工具。")
- if not unconnected:
- logger.info("没有任何未接入工具需要处理。")
- store.close()
- return
- # 尝试从本地缓存文件读取,避免重复消耗大模型调用
- import os
- match_file = "matches.json"
-
- if os.path.exists(match_file):
- logger.info(f"发现本地存在的缓存文件 {match_file},将直接读取现有分析结果!")
- with open(match_file, "r", encoding="utf-8") as f:
- all_matches = json.load(f)
- else:
- # 切割 batch,降低大模型的单次信息处理压力,提升专注度
- batch_size = 10
- all_matches = []
-
- total_batches = (len(unconnected) // batch_size) + (1 if len(unconnected) % batch_size != 0 else 0)
- for i in range(0, len(unconnected), batch_size):
- batch = unconnected[i:i + batch_size]
- logger.info(f"正在交给 Claude 引擎评估第 {i//batch_size + 1}/{total_batches} 批 ({len(batch)} items) ...")
-
- matches = await match_tools_with_claude(connected, batch)
- if matches:
- all_matches.extend(matches)
-
- # 缓存映射结果
- with open(match_file, "w", encoding="utf-8") as f:
- json.dump(all_matches, f, ensure_ascii=False, indent=2)
-
- logger.info(f"\n====================== 预览报告 ======================")
- if not all_matches:
- logger.info("Claude 没有找到任何可以安全结合的工具!")
- else:
- logger.info(f"Claude 匹配上了 {len(all_matches)} 个工具,建议的更换路线如下:")
- for m in all_matches:
- logger.info(f" - [冗余废除] {m.get('unconnected_id')} ==> [主工具] {m.get('connected_id')}")
- logger.info(f" 理由: {m.get('reason','')}")
-
- logger.info(f"====================================================\n")
-
- if apply:
- logger.info("启动真实的数据库落地置换 (Apply mode is ON)...")
- apply_database_merge(store, all_matches)
- else:
- logger.info("目前为 Dry-Run 模式,如果审查匹配无误觉得干的漂亮,请携带 --apply 开关挂载实际融合执行!")
- store.close()
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--apply", action="store_true", help="Apply DB migrations physically")
- args = parser.parse_args()
-
- asyncio.run(run_workflow(apply=args.apply))
|