|
|
@@ -0,0 +1,320 @@
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import uuid
|
|
|
+from typing import Dict, Any
|
|
|
+from datetime import datetime
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+from psycopg2.extras import RealDictCursor
|
|
|
+from claude_agent_sdk import tool, ClaudeAgentOptions, ClaudeSDKClient, AssistantMessage, TextBlock
|
|
|
+
|
|
|
+# import app specific configurations
|
|
|
+from tool_agent.tool.tool_store import PostgreSQLToolStore
|
|
|
+from tool_agent.tool.capability import PostgreSQLCapabilityStore
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# 数据存储目录,用于存放工单与匹配缓存结果
|
|
|
+TICKET_DIR = Path("data/maintenance_tickets")
|
|
|
+TICKET_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+SYSTEM_PROMPT = """你是一个专业的 API 工具库去重专家。
|
|
|
+你的任务是将一份【未接入工具列表】与一份核心的【已接入工具列表】进行比对归并。
|
|
|
+如果某一个【未接入工具】(哪怕换了名字叫别的)在【已接入工具】中已经有同样的本质功能,请找出这种映射关系。
|
|
|
+
|
|
|
+例如:
|
|
|
+未接入:"通过 Midjourney 生成图片" 或 "Midjourney 查进度"
|
|
|
+已接入:由于我们有 `midjourney_submit_job` 和 `midjourney_query_job_status`,请寻找最合理的对应。
|
|
|
+
|
|
|
+【重要领域等价字典】请你在判定时,严格遵从以下我们架构中的内置别名等价逻辑:
|
|
|
+1. "Nano Banana" 或者 "nanobanana" 等同于 "Google Gemini 生图" 或 "Imagen 3"
|
|
|
+2. "BFL" 等同于 "Flux 官方生图 API"
|
|
|
+3. "LiblibAI" 或 "哩布哩布" 等同于任何有关 Controlnet (不论是 OpenPose/Canny 等变体) 的支持
|
|
|
+4. "RunComfy" 等同于 "ComfyUI 远程实例或工作流执行"
|
|
|
+5. "即梦" 或 "ji_meng" 同等对应其系列任务
|
|
|
+
|
|
|
+如果未能找到合理的已接入工具应对,则不输出映射。
|
|
|
+
|
|
|
+请输出一个合法的 JSON 数组,必须是一个 list of objects:
|
|
|
+[
|
|
|
+ {
|
|
|
+ "unconnected_id": "<未接入的工具id>",
|
|
|
+ "connected_id": "<对应最合理的已接入的工具id>",
|
|
|
+ "reason": "<非常简短的一句话匹配理由>"
|
|
|
+ }
|
|
|
+]
|
|
|
+
|
|
|
+请绝对不要包含任何 Markdown 语法、```json 标签,直接输出原生 JSON 字符串。
|
|
|
+"""
|
|
|
+
|
|
|
+async def match_tools_with_claude(connected_tools, unconnected_batch):
|
|
|
+ conn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in connected_tools], ensure_ascii=False)
|
|
|
+ unconn_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"]} for t in unconnected_batch], ensure_ascii=False)
|
|
|
+
|
|
|
+ prompt = f"【已接入工具库(作为对标参考)】:\n{conn_str}\n\n【本次需判定匹配的未接入工具】:\n{unconn_str}\n\n请输出JSON结果:"
|
|
|
+
|
|
|
+ options = ClaudeAgentOptions(model="claude-sonnet-4-5", system_prompt=SYSTEM_PROMPT)
|
|
|
+ result_text = ""
|
|
|
+ try:
|
|
|
+ async with ClaudeSDKClient(options=options) as client:
|
|
|
+ await client.query(prompt)
|
|
|
+ async for msg in client.receive_response():
|
|
|
+ if isinstance(msg, AssistantMessage):
|
|
|
+ for block in msg.content:
|
|
|
+ if isinstance(block, TextBlock):
|
|
|
+ result_text += block.text
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Claude API failed: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 毫无保留地保存最原始的字符串落盘(防崩备用方案)
|
|
|
+ with open("raw_claude_responses.log", "a", encoding="utf-8") as f:
|
|
|
+ f.write(f"\n--- Batch Output ---\n{result_text}\n")
|
|
|
+
|
|
|
+ try:
|
|
|
+ clean_json = result_text.strip()
|
|
|
+ if clean_json.startswith("```json"):
|
|
|
+ clean_json = clean_json[7:]
|
|
|
+ elif clean_json.startswith("```"):
|
|
|
+ clean_json = clean_json[3:]
|
|
|
+ if clean_json.endswith("```"):
|
|
|
+ clean_json = clean_json[:-3]
|
|
|
+
|
|
|
+ data = json.loads(clean_json.strip())
|
|
|
+ if isinstance(data, dict):
|
|
|
+ if "unconnected_id" in data:
|
|
|
+ return [data]
|
|
|
+ return []
|
|
|
+ elif isinstance(data, list):
|
|
|
+ return [item for item in data if isinstance(item, dict)]
|
|
|
+ return []
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to parse JSON response: {e}\nResponse was: {result_text[:200]}")
|
|
|
+ return []
|
|
|
+
|
|
|
+def apply_database_merge(store: PostgreSQLToolStore, match_plan: list):
|
|
|
+ """根据生成的合法匹配图,执行数据库替换"""
|
|
|
+ if not match_plan:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ c = store.conn.cursor(cursor_factory=RealDictCursor)
|
|
|
+ merged_count = 0
|
|
|
+
|
|
|
+ for match in match_plan:
|
|
|
+ uid = match.get("unconnected_id")
|
|
|
+ cid = match.get("connected_id")
|
|
|
+ if not uid or not cid:
|
|
|
+ continue
|
|
|
+
|
|
|
+ try:
|
|
|
+ c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (uid,))
|
|
|
+ u_tool = c.fetchone()
|
|
|
+
|
|
|
+ c.execute("SELECT introduction, tutorial, version FROM tool WHERE id = %s", (cid,))
|
|
|
+ c_tool = c.fetchone()
|
|
|
+
|
|
|
+ if u_tool and c_tool:
|
|
|
+ u_ver = u_tool.get('version') or ''
|
|
|
+ c_ver = c_tool.get('version') or ''
|
|
|
+ latest_ver = c_ver if c_ver and c_ver != "1.0.0" else (u_ver if u_ver else "1.0.0")
|
|
|
+ version_prefix = f"已支持的最新版本: {latest_ver}\n\n"
|
|
|
+
|
|
|
+ new_intro = version_prefix + (c_tool['introduction'] or '') + "\n\n[融合补充描述]:\n" + (u_tool['introduction'] or '')
|
|
|
+ new_tutorial = (c_tool['tutorial'] or '') + "\n\n" + (u_tool['tutorial'] or '')
|
|
|
+ c.execute("UPDATE tool SET introduction=%s, tutorial=%s WHERE id=%s", (new_intro[:4000], new_tutorial[:4000], cid))
|
|
|
+
|
|
|
+ tables_to_migrate = [
|
|
|
+ ("capability_tool", "capability_id", "tool_id"),
|
|
|
+ ("tool_knowledge", "knowledge_id", "tool_id"),
|
|
|
+ ("tool_provider", "provider_id", "tool_id")
|
|
|
+ ]
|
|
|
+
|
|
|
+ for table, _, tool_col in tables_to_migrate:
|
|
|
+ c.execute(f"SELECT * FROM {table} WHERE {tool_col} = %s", (uid,))
|
|
|
+ rows = c.fetchall()
|
|
|
+ for row in rows:
|
|
|
+ if table == "capability_tool":
|
|
|
+ c.execute("INSERT INTO capability_tool (capability_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['capability_id'], cid))
|
|
|
+ elif table == "tool_knowledge":
|
|
|
+ c.execute("INSERT INTO tool_knowledge (knowledge_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['knowledge_id'], cid))
|
|
|
+ elif table == "tool_provider":
|
|
|
+ c.execute("INSERT INTO tool_provider (provider_id, tool_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['provider_id'], cid))
|
|
|
+
|
|
|
+ c.execute("DELETE FROM capability_tool WHERE tool_id = %s", (uid,))
|
|
|
+ c.execute("DELETE FROM tool_knowledge WHERE tool_id = %s", (uid,))
|
|
|
+ c.execute("DELETE FROM tool_provider WHERE tool_id = %s", (uid,))
|
|
|
+ c.execute("DELETE FROM tool WHERE id = %s", (uid,))
|
|
|
+ merged_count += 1
|
|
|
+ store.conn.commit()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理 {uid} 的融合时出错: {e}")
|
|
|
+ store.conn.rollback() # 出错则回滚该条
|
|
|
+ continue
|
|
|
+
|
|
|
+ c.close()
|
|
|
+ return merged_count
|
|
|
+
|
|
|
+
|
|
|
+# ===========================================================================
|
|
|
+# 异步提单状态机 (Tickets State Machine)
|
|
|
+# ===========================================================================
|
|
|
+
|
|
|
+class TicketManager:
|
|
|
+ @classmethod
|
|
|
+ def save_ticket(cls, ticket_id: str, data: Dict[str, Any]):
|
|
|
+ filepath = TICKET_DIR / f"{ticket_id}.json"
|
|
|
+ with open(filepath, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def load_ticket(cls, ticket_id: str) -> Dict[str, Any] | None:
|
|
|
+ filepath = TICKET_DIR / f"{ticket_id}.json"
|
|
|
+ if not filepath.exists():
|
|
|
+ return None
|
|
|
+ with open(filepath, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+
|
|
|
+async def _deduplication_background_task(ticket_id: str):
|
|
|
+ logger.info(f"[Ticket {ticket_id}] 启动异步清洗去重计算任务...")
|
|
|
+
|
|
|
+ TicketManager.save_ticket(ticket_id, {
|
|
|
+ "status": "PROCESSING",
|
|
|
+ "progress": "Initialization",
|
|
|
+ "created_at": datetime.now().isoformat(),
|
|
|
+ "matches": []
|
|
|
+ })
|
|
|
+
|
|
|
+ store = PostgreSQLToolStore()
|
|
|
+ tools = store.list_all(limit=2000)
|
|
|
+
|
|
|
+ connected = [t for t in tools if t.get("status") == "已接入"]
|
|
|
+ unconnected = [t for t in tools if t.get("status") != "已接入"]
|
|
|
+
|
|
|
+ if not unconnected:
|
|
|
+ TicketManager.save_ticket(ticket_id, {
|
|
|
+ "status": "COMPLETED",
|
|
|
+ "progress": "No unconnected tools found.",
|
|
|
+ "matches": []
|
|
|
+ })
|
|
|
+ store.close()
|
|
|
+ return
|
|
|
+
|
|
|
+ batch_size = 10
|
|
|
+ all_matches = []
|
|
|
+
|
|
|
+ total_batches = (len(unconnected) // batch_size) + (1 if len(unconnected) % batch_size != 0 else 0)
|
|
|
+
|
|
|
+ try:
|
|
|
+ for i in range(0, len(unconnected), batch_size):
|
|
|
+ batch = unconnected[i:i + batch_size]
|
|
|
+ logger.info(f"正在交给 Claude 引擎评估第 {i//batch_size + 1}/{total_batches} 批 ({len(batch)} items) ...")
|
|
|
+
|
|
|
+ # 更新状态防失联
|
|
|
+ TicketManager.save_ticket(ticket_id, {
|
|
|
+ "status": "PROCESSING",
|
|
|
+ "progress": f"Evaluating batch {i//batch_size + 1}/{total_batches}",
|
|
|
+ "matches": all_matches
|
|
|
+ })
|
|
|
+
|
|
|
+ matches = await match_tools_with_claude(connected, batch)
|
|
|
+ if matches:
|
|
|
+ all_matches.extend(matches)
|
|
|
+
|
|
|
+ # 任务完毕,准备等待人工审批
|
|
|
+ TicketManager.save_ticket(ticket_id, {
|
|
|
+ "status": "READY_FOR_REVIEW",
|
|
|
+ "progress": "All LLM deductions complete. Ready for manual apply.",
|
|
|
+ "matches": all_matches
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[Ticket {ticket_id}] 运维背景清洗错误: {e}")
|
|
|
+ TicketManager.save_ticket(ticket_id, {
|
|
|
+ "status": "FAILED",
|
|
|
+ "progress": f"Error: {str(e)}",
|
|
|
+ "matches": all_matches
|
|
|
+ })
|
|
|
+ finally:
|
|
|
+ store.close()
|
|
|
+
|
|
|
+
|
|
|
+# ===========================================================================
|
|
|
+# Agents 外暴的 MCP Tools
|
|
|
+# ===========================================================================
|
|
|
+
|
|
|
+def _result(data: dict) -> dict:
|
|
|
+ return {"content": [{"type": "text", "text": json.dumps(data, ensure_ascii=False, indent=2)}]}
|
|
|
+
|
|
|
+
|
|
|
+@tool(name="create_merge_unconnected_tools_ticket", description="""
|
|
|
+发起一个异步评估工具。用于调用大模型全量分析所有的“已接入工具”和“未接入工具”,
|
|
|
+进行智能清理注册表的后台任务。这是一个非常耗时的操作(可能需要 15 分钟),
|
|
|
+所以调用它会立即返回一个 Ticket ID,你需要记录下来并在稍后轮询。
|
|
|
+""", input_schema={"type": "object", "properties": {}})
|
|
|
+async def create_merge_unconnected_tools_ticket(args):
|
|
|
+ ticket_id = f"MERGE-{uuid.uuid4().hex[:8].upper()}"
|
|
|
+ asyncio.create_task(_deduplication_background_task(ticket_id))
|
|
|
+ return _result({
|
|
|
+ "info": "异步清理计算作业已下发!请不要阻碍当前进程流。",
|
|
|
+ "ticket_id": ticket_id,
|
|
|
+ "next_step": "调用 track_maintenance_ticket_status,带着这个 ticket_id 查看执行是否完毕。"
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+@tool(name="track_maintenance_ticket_status", description="""
|
|
|
+追踪工单的处理情况。当 status 为 READY_FOR_REVIEW 时,代表匹配的去重方案已经生成完毕。
|
|
|
+你需要将 matches 里的干预信息详细打印出来展示给用户确认审查,征求他们意见。
|
|
|
+""", input_schema={
|
|
|
+ "type": "object",
|
|
|
+ "properties": {"ticket_id": {"type": "string"}},
|
|
|
+ "required": ["ticket_id"]
|
|
|
+})
|
|
|
+async def track_maintenance_ticket_status(args):
|
|
|
+ ticket = TicketManager.load_ticket(args["ticket_id"])
|
|
|
+ if not ticket:
|
|
|
+ return _result({"error": "找不到该工单,请检查输入或文件丢失。"})
|
|
|
+
|
|
|
+ # 限制过长的 matches,以防 MCP 塞爆
|
|
|
+ if ticket.get("status") == "READY_FOR_REVIEW":
|
|
|
+ preview = ticket.get("matches", [])
|
|
|
+ if len(preview) > 50:
|
|
|
+ preview = preview[:50] + [{"notice": "为防止通信拥堵,其余被截断隐去..."}]
|
|
|
+ ticket["matches_preview"] = preview
|
|
|
+ if "matches" in ticket:
|
|
|
+ del ticket["matches"]
|
|
|
+
|
|
|
+ return _result(ticket)
|
|
|
+
|
|
|
+
|
|
|
+@tool(name="approve_maintenance_ticket", description="""
|
|
|
+当用户同意了你的工具合并匹配干预报表,你就可以带着票号调用这个指令。
|
|
|
+它会立即触发物理上的外键数据库更替、重组、剔除操作,完成真正的合并洗污!
|
|
|
+""", input_schema={
|
|
|
+ "type": "object",
|
|
|
+ "properties": {"ticket_id": {"type": "string"}},
|
|
|
+ "required": ["ticket_id"]
|
|
|
+})
|
|
|
+async def approve_maintenance_ticket(args):
|
|
|
+ ticket_id = args["ticket_id"]
|
|
|
+ ticket = TicketManager.load_ticket(ticket_id)
|
|
|
+ if not ticket:
|
|
|
+ return _result({"error": "找不到该工单,请检查。"})
|
|
|
+
|
|
|
+ if ticket.get("status") != "READY_FOR_REVIEW":
|
|
|
+ return _result({"error": f"目前的工单状态为 {ticket.get('status')},而不是 READY_FOR_REVIEW, 不具备施加的前提!"})
|
|
|
+
|
|
|
+ store = PostgreSQLToolStore()
|
|
|
+ try:
|
|
|
+ count = apply_database_merge(store, ticket.get("matches", []))
|
|
|
+ ticket["status"] = "COMPLETED"
|
|
|
+ ticket["progress"] = f"成功融合淘汰了 {count} 个边缘旧工具与它们的关系网!"
|
|
|
+ TicketManager.save_ticket(ticket_id, ticket)
|
|
|
+ return _result({"success": True, "info": ticket["progress"]})
|
|
|
+ except Exception as e:
|
|
|
+ return _result({"error": f"物理落地时产生了报错:{e}"})
|
|
|
+ finally:
|
|
|
+ store.close()
|
|
|
+
|
|
|
+
|