merge_unconnected_tools.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import json
  4. import logging
  5. import argparse
  6. from psycopg2.extras import RealDictCursor
  7. # 导入应用内的模块
  8. from tool_agent.tool.tool_store import PostgreSQLToolStore
  9. from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient, AssistantMessage, TextBlock
  10. logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
  11. logger = logging.getLogger(__name__)
  12. SYSTEM_PROMPT = """你是一个专业的 API 工具库去重专家。
  13. 你的任务是将一份【未接入工具列表】与一份核心的【已接入工具列表】进行比对归并。
  14. 如果某一个【未接入工具】(哪怕换了名字叫别的)在【已接入工具】中已经有同样的本质功能,请找出这种映射关系。
  15. 例如:
  16. 未接入:"通过 Midjourney 生成图片" 或 "Midjourney 查进度"
  17. 已接入:由于我们有 `midjourney_submit_job` 和 `midjourney_query_job_status`,请寻找最合理的对应。
  18. 【重要领域等价字典】请你在判定时,严格遵从以下我们架构中的内置别名等价逻辑:
  19. 1. "Nano Banana" 或者 "nanobanana" 等同于 "Google Gemini 生图" 或 "Imagen 3"
  20. 2. "BFL" 等同于 "Flux 官方生图 API"
  21. 3. "LiblibAI" 或 "哩布哩布" 等同于任何有关 Controlnet (不论是 OpenPose/Canny 等变体) 的支持
  22. 4. "RunComfy" 等同于 "ComfyUI 远程实例或工作流执行"
  23. 5. "即梦" 或 "ji_meng" 同等对应其系列任务
  24. 如果未能找到合理的已接入工具应对,则不输出映射。
  25. 请输出一个合法的 JSON 数组,必须是一个 list of objects:
  26. [
  27. {
  28. "unconnected_id": "<未接入的工具id>",
  29. "connected_id": "<对应最合理的已接入的工具id>",
  30. "reason": "<非常简短的一句话匹配理由>"
  31. }
  32. ]
  33. 请绝对不要包含任何 Markdown 语法、```json 标签,直接输出原生 JSON 字符串。
  34. """
  35. async def match_tools_with_claude(connected_tools, unconnected_batch):
  36. conn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in connected_tools], ensure_ascii=False)
  37. unconn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in unconnected_batch], ensure_ascii=False)
  38. prompt = f"【已接入工具库(作为对标参考)】:\n{conn_str}\n\n【本次需判定匹配的未接入工具】:\n{unconn_str}\n\n请输出JSON结果:"
  39. options = ClaudeAgentOptions(model="claude-sonnet-4-5", system_prompt=SYSTEM_PROMPT)
  40. # 建立客户端调用
  41. result_text = ""
  42. try:
  43. async with ClaudeSDKClient(options=options) as client:
  44. await client.query(prompt)
  45. async for msg in client.receive_response():
  46. if isinstance(msg, AssistantMessage):
  47. for block in msg.content:
  48. if isinstance(block, TextBlock):
  49. result_text += block.text
  50. except Exception as e:
  51. logger.error(f"Claude API failed: {e}")
  52. return []
  53. # 毫无保留地保存最原始的字符串落盘(防崩备用方案)
  54. with open("raw_claude_responses.log", "a", encoding="utf-8") as f:
  55. f.write(f"\n--- Batch Output ---\n{result_text}\n")
  56. # 尝试解析 JSON
  57. try:
  58. # 去掉可能的 Markdown 包装
  59. clean_json = result_text.strip()
  60. if clean_json.startswith("```json"):
  61. clean_json = clean_json[7:]
  62. elif clean_json.startswith("```"):
  63. clean_json = clean_json[3:]
  64. if clean_json.endswith("```"):
  65. clean_json = clean_json[:-3]
  66. data = json.loads(clean_json.strip())
  67. if isinstance(data, dict):
  68. if "unconnected_id" in data:
  69. return [data]
  70. return []
  71. elif isinstance(data, list):
  72. return [item for item in data if isinstance(item, dict)]
  73. return []
  74. except Exception as e:
  75. logger.error(f"Failed to parse JSON response: {e}\nResponse was: {result_text[:200]}")
  76. return []
  77. def apply_database_merge(store: PostgreSQLToolStore, match_plan: list):
  78. """根据生成的合法匹配图,执行数据库替换"""
  79. if not match_plan:
  80. logger.info("没有任何匹配数据可执行落地!")
  81. return
  82. c = store.conn.cursor(cursor_factory=RealDictCursor)
  83. merged_count = 0
  84. for match in match_plan:
  85. uid = match.get("unconnected_id")
  86. cid = match.get("connected_id")
  87. if not uid or not cid:
  88. continue
  89. logger.info(f"==> 执行合并映射 [未接入: {uid} -> 已接入: {cid}]")
  90. try:
  91. # 1. 查询出原未接入工具的特征介绍准备拼接
  92. c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (uid,))
  93. u_tool = c.fetchone()
  94. c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (cid,))
  95. c_tool = c.fetchone()
  96. if u_tool and c_tool:
  97. u_ver = u_tool.get('version') or ''
  98. c_ver = c_tool.get('version') or ''
  99. # Extract best version, favor c_ver if exists and not placeholder
  100. latest_ver = c_ver if c_ver and c_ver != "1.0.0" else (u_ver if u_ver else "1.0.0")
  101. version_prefix = f"已支持的最新版本: {latest_ver}\n\n"
  102. new_intro = version_prefix + (c_tool['introduction'] or '') + "\n\n[融合补充描述]:\n" + (u_tool['introduction'] or '')
  103. new_tutorial = (c_tool['tutorial'] or '') + "\n\n" + (u_tool['tutorial'] or '')
  104. c.execute("UPDATE tool SET introduction=%s, tutorial=%s WHERE id=%s", (new_intro[:4000], new_tutorial[:4000], cid))
  105. # 2. 跟原表里外键绑定的迁移过去(只搬迁关系,利用 ON CONFLICT 防止主键重复)
  106. tables_to_migrate = [
  107. ("capability_tool", "capability_id", "tool_id"),
  108. ("tool_knowledge", "knowledge_id", "tool_id"),
  109. ("tool_provider", "provider_id", "tool_id")
  110. ]
  111. for table, _, tool_col in tables_to_migrate:
  112. # 找出旧工具的所有关联
  113. c.execute(f"SELECT * FROM {table} WHERE {tool_col} = %s", (uid,))
  114. rows = c.fetchall()
  115. for row in rows:
  116. if table == "capability_tool":
  117. c.execute("INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['capability_id'], cid))
  118. elif table == "tool_knowledge":
  119. c.execute("INSERT INTO tool_knowledge (knowledge_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['knowledge_id'], cid))
  120. elif table == "tool_provider":
  121. c.execute("INSERT INTO tool_provider (provider_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['provider_id'], cid))
  122. # 3. 将原未接入工具删除 (cascade delete if properly constructed by DB, else manually delete relations first)
  123. # 既然我们已经读了并插入到了新的关联里,我们需要手动删除旧的外键绑定好让 CASCADE 清晰或者直接删
  124. c.execute("DELETE FROM capability_tool WHERE tool_id = %s", (uid,))
  125. c.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (uid,))
  126. c.execute("DELETE FROM tool_provider WHERE tool_id = %s", (uid,))
  127. c.execute("DELETE FROM tool WHERE id = %s", (uid,))
  128. merged_count += 1
  129. except Exception as e:
  130. logger.error(f"处理 {uid} 的融合时出错: {e}")
  131. store.conn.rollback() # 出错则回滚该条
  132. continue
  133. # 单条合并成功可提交
  134. store.conn.commit()
  135. c.close()
  136. logger.info(f"所有指定合并执行完毕,已彻底抹除融合了 {merged_count} 个废弃节点工具!")
  137. async def run_workflow(apply=False):
  138. store = PostgreSQLToolStore()
  139. tools = store.list_all(limit=2000)
  140. connected = [t for t in tools if t.get("status") == "已接入"]
  141. unconnected = [t for t in tools if t.get("status") != "已接入"]
  142. logger.info(f"提取出 {len(connected)} 个基础已接入工具,以及 {len(unconnected)} 个未接入待判决工具。")
  143. if not unconnected:
  144. logger.info("没有任何未接入工具需要处理。")
  145. store.close()
  146. return
  147. # 尝试从本地缓存文件读取,避免重复消耗大模型调用
  148. import os
  149. match_file = "matches.json"
  150. if os.path.exists(match_file):
  151. logger.info(f"发现本地存在的缓存文件 {match_file},将直接读取现有分析结果!")
  152. with open(match_file, "r", encoding="utf-8") as f:
  153. all_matches = json.load(f)
  154. else:
  155. # 切割 batch,降低大模型的单次信息处理压力,提升专注度
  156. batch_size = 10
  157. all_matches = []
  158. total_batches = (len(unconnected) // batch_size) + (1 if len(unconnected) % batch_size != 0 else 0)
  159. for i in range(0, len(unconnected), batch_size):
  160. batch = unconnected[i:i + batch_size]
  161. logger.info(f"正在交给 Claude 引擎评估第 {i//batch_size + 1}/{total_batches} 批 ({len(batch)} items) ...")
  162. matches = await match_tools_with_claude(connected, batch)
  163. if matches:
  164. all_matches.extend(matches)
  165. # 缓存映射结果
  166. with open(match_file, "w", encoding="utf-8") as f:
  167. json.dump(all_matches, f, ensure_ascii=False, indent=2)
  168. logger.info(f"\n====================== 预览报告 ======================")
  169. if not all_matches:
  170. logger.info("Claude 没有找到任何可以安全结合的工具!")
  171. else:
  172. logger.info(f"Claude 匹配上了 {len(all_matches)} 个工具,建议的更换路线如下:")
  173. for m in all_matches:
  174. logger.info(f" - [冗余废除] {m.get('unconnected_id')} ==> [主工具] {m.get('connected_id')}")
  175. logger.info(f" 理由: {m.get('reason','')}")
  176. logger.info(f"====================================================\n")
  177. if apply:
  178. logger.info("启动真实的数据库落地置换 (Apply mode is ON)...")
  179. apply_database_merge(store, all_matches)
  180. else:
  181. logger.info("目前为 Dry-Run 模式,如果审查匹配无误觉得干的漂亮,请携带 --apply 开关挂载实际融合执行!")
  182. store.close()
  183. if __name__ == "__main__":
  184. parser = argparse.ArgumentParser()
  185. parser.add_argument("--apply", action="store_true", help="Apply DB migrations physically")
  186. args = parser.parse_args()
  187. asyncio.run(run_workflow(apply=args.apply))