import json import logging import uuid import os import asyncio from typing import List from agent.tools import tool, ToolResult from agent.llm.openrouter import openrouter_llm_call # 导入底层 Postgres 资产表依赖 from tool_agent.tool.tool_store import PostgreSQLToolStore from tool_agent.tool.capability import PostgreSQLCapabilityStore logger = logging.getLogger(__name__) SYSTEM_PROMPT_CAPABILITY = """你是一个专业的能力分析师。 你的任务是从给定的【待分析新工具】的使用介绍和它挂载的【相关背景知识文章】中,提取出它对整网【原子能力表】的贡献,并选择新建或是融合。 ## 定义与格式 1. 原子能力是面向需求、跨工具的独立完整业务单元。 2. 端到端型工具(如Midjourney)直接抽取能力;编排平台工具(如ComfyUI节点群)从实际搭建的工作流中提取能力视角(不要原子化平台或独立节点本身)。 请输出严格的 JSON 数组结构: [ { "action": "create", "tool_id": "<当前待分析的工具ID>", "knowledge_ids": ["<哪些传入的相关知识促成了这个能力,填入真实的知识ID>"], "capability_id": "NEW_<任意数字字母临时ID>", "name": "<总结提炼的新能力统称名>", "criterion": "<客观统一的判定标准>", "description": "<抽象的能力需求场景说明>", "implement_description": "<在该特定工具里是如何实现该能力的(具体操作或调用链)>" }, { "action": "attach", "tool_id": "<当前待分析的工具ID>", "knowledge_ids": ["<关联的知识ID,可为空数组>"], "capability_id": "<来自下面【已有全量能力库】字典中完全等价功能的真实ID>", "implement_description": "<在该特定工具里实现此老能力的具体手法>" }, { "action": "update_and_attach", "tool_id": "<当前待分析的工具ID>", "knowledge_ids": ["<关联的知识ID>"], "capability_id": "<来自【已有全量能力库】的真实ID>", "name": "<更统整包容的全局更好命名(如果不改就留空不变)>", "criterion": "<更新优化后的判定标准>", "description": "<更新优化后的描述>", "implement_description": "<在该特定工具中如何实现>" } ] 请绝对不要输出 markdown 包装,仅输出原生的合法 JSON。如果一个工具覆盖了多个独立原子能力,请为每个能力出具一条动作操作。 """ def _fetch_knowledge_map(cursor, k_ids: list): if not k_ids: return {} placeholders = ','.join(['%s'] * len(k_ids)) cursor.execute(f"SELECT row_to_json(knowledge) as data FROM knowledge WHERE id IN ({placeholders})", list(k_ids)) mapping = {} for r in cursor.fetchall(): d = r['data'] text = str(d.get('content', d.get('markdown', d.get('description', '')))) mapping[d['id']] = {"title": d.get('title', ''), "content": text[:4000]} return mapping async def extract_capabilities_with_claude(existing_caps, tool_batch, knowledge_map): cap_str = json.dumps([{"id": c["id"], "name": c["name"], "criterion": c.get("criterion", "")} for c in existing_caps], ensure_ascii=False) tool_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"], "docs": t["tutorial"], "associated_knowledge": t.get("knowledge_ids", [])} for t in tool_batch], ensure_ascii=False) knowledge_str = json.dumps(knowledge_map, ensure_ascii=False) prompt = f"【现有全量原子能力库字典】:\n{cap_str}\n\n【相关背景知识文章】:\n{knowledge_str}\n\n【本次待分析抽取合并的工具列表】:\n{tool_str}\n\n请严格输出JSON操作数组:" messages = [ {"role": "system", "content": SYSTEM_PROMPT_CAPABILITY}, {"role": "user", "content": prompt} ] result_text = "" try: result = await openrouter_llm_call( messages=messages, model="anthropic/claude-sonnet-4-5", temperature=0.2 ) result_text = result.get("content", "") except Exception as e: logger.error(f"OpenRouter API failed: {e}") return [] with open("raw_capability_responses.log", "a", encoding="utf-8") as f: f.write(f"\n--- Synchronous Capability Batch Output ---\n{result_text}\n") try: clean_json = result_text.strip() if clean_json.startswith("```json"): clean_json = clean_json[7:] elif clean_json.startswith("```"): clean_json = clean_json[3:] if clean_json.endswith("```"): clean_json = clean_json[:-3] data = json.loads(clean_json.strip()) if isinstance(data, dict): if "action" in data: return [data] return [] elif isinstance(data, list): return [item for item in data if isinstance(item, dict) and "action" in item] return [] except Exception as e: logger.error(f"Failed to parse capability JSON: {e}") return [] @tool() async def sync_atomic_capabilities(target_tool_ids: List[str]) -> ToolResult: """ 一键式强同步工具(为Librarian等智能体量身打造)。 针对新发现的或发生变动的特定工具/知识源,它能在数十秒内完成关联获取、大模型分析并直接完成底层 PostgreSQL 更新操作。 直接返回应用成功后的增减战报。 Args: target_tool_ids: 必须提供。指定要被大模型执行能力审查提取的工具 ID 列表。建议每次传入数量极少(如 1-3个)以保证 15 秒内同步快速返回。 """ if not target_tool_ids: return ToolResult(title="❌ 参数错误", output="必须提供 target_tool_ids,独立系统不再允许发起全量全局扫描避免阻塞。", error="Missing target_tool_ids") logger.info(f"开启单通道同步能力萃取 (目标: {target_tool_ids})...") cap_store = PostgreSQLCapabilityStore() tool_store = PostgreSQLToolStore() k_cursor = cap_store._get_cursor() stats = {"created": 0, "attached": 0, "updated": 0, "knowledge_inherited": 0} try: existing_caps = cap_store.list_all(limit=5000) all_tools = tool_store.list_all(limit=2000) target_tools = [t for t in all_tools if t.get("id") in target_tool_ids] if not target_tools: return ToolResult(title="❌ 未找到工具", output=f"找不到任何由 {target_tool_ids} 制定的接入工具") # 拉取目标工具的强绑定相关知识 batch_k_ids = set([k for t in target_tools for k in t.get("knowledge_ids", [])]) k_map = _fetch_knowledge_map(k_cursor, list(batch_k_ids)) # Claude 执行抽象推理构建矩阵 ops = await extract_capabilities_with_claude(existing_caps, target_tools, k_map) if not ops: return ToolResult(title="ℹ️ 分析完成", output="大模型判定当前工具没有提取出任何有效或创新的功能资产。") temp_id_mapping = {} # 落地阶段一:先创造出新的原子能力 for op in ops: if op.get("action") == "create" and op.get("capability_id") and op.get("tool_id"): real_id = f"cap-{uuid.uuid4().hex[:12]}" temp_id_mapping[op.get("capability_id")] = real_id t_id = op.get("tool_id") inherited_knowledge = op.get("knowledge_ids", []) stats["knowledge_inherited"] += len(inherited_knowledge) cap_store.insert_or_update({ "id": real_id, "name": op.get("name", ""), "criterion": op.get("criterion", ""), "description": op.get("description", ""), "tool_ids": [t_id], "implements": {t_id: op.get("implement_description", "")}, "knowledge_ids": inherited_knowledge }) stats["created"] += 1 # 落地阶段二:处理老能力的依附和扩展刷新 for op in ops: action = op.get("action") if action in ("attach", "update_and_attach") and op.get("capability_id") and op.get("tool_id"): c_id = temp_id_mapping.get(op.get("capability_id"), op.get("capability_id")) existing_cap = cap_store.get_by_id(c_id) if not existing_cap: continue if action == "update_and_attach": existing_cap["name"] = op.get("name") or existing_cap.get("name") existing_cap["criterion"] = op.get("criterion") or existing_cap.get("criterion") existing_cap["description"] = op.get("description") or existing_cap.get("description") stats["updated"] += 1 t_id = op.get("tool_id") imp_desc = op.get("implement_description", "") tool_ids = existing_cap.get("tool_ids", []) if t_id not in tool_ids: tool_ids.append(t_id) existing_cap["tool_ids"] = tool_ids implements = existing_cap.get("implements", {}) implements[t_id] = imp_desc existing_cap["implements"] = implements op_k_ids = op.get("knowledge_ids", []) if op_k_ids: existing_k_ids = set(existing_cap.get("knowledge_ids", [])) new_k_ids = [k for k in op_k_ids if k not in existing_k_ids] if new_k_ids: existing_k_ids.update(new_k_ids) existing_cap["knowledge_ids"] = list(existing_k_ids) stats["knowledge_inherited"] += len(new_k_ids) cap_store.insert_or_update(existing_cap) stats["attached"] += 1 return ToolResult( title="✅ 强同步萃取完成", output=f"强同步萃取完毕并入库: 新生能力 {stats['created']}, 修缮扩写 {stats['updated']}, 同化挂载 {stats['attached']} (沿袭知识网脉络 {stats['knowledge_inherited']} 条).\n\n详情记录:\n" + json.dumps(ops, ensure_ascii=False, indent=2) ) except Exception as e: logger.error(f"Sync capability extraction failed: {e}") cap_store.conn.rollback() return ToolResult(title="❌ 系统异常", output=f"执行时发生错误: {str(e)}", error=str(e)) finally: k_cursor.close() cap_store.close() tool_store.close()