| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import json
- import logging
- import uuid
- import os
- import asyncio
- from typing import List
- from agent.tools import tool, ToolResult
- from agent.llm.openrouter import openrouter_llm_call
- # 导入底层 Postgres 资产表依赖
- from tool_agent.tool.tool_store import PostgreSQLToolStore
- from tool_agent.tool.capability import PostgreSQLCapabilityStore
- logger = logging.getLogger(__name__)
- SYSTEM_PROMPT_CAPABILITY = """你是一个专业的能力分析师。
- 你的任务是从给定的【待分析新工具】的使用介绍和它挂载的【相关背景知识文章】中,提取出它对整网【原子能力表】的贡献,并选择新建或是融合。
- ## 定义与格式
- 1. 原子能力是面向需求、跨工具的独立完整业务单元。
- 2. 端到端型工具(如Midjourney)直接抽取能力;编排平台工具(如ComfyUI节点群)从实际搭建的工作流中提取能力视角(不要原子化平台或独立节点本身)。
- 请输出严格的 JSON 数组结构:
- [
- {
- "action": "create",
- "tool_id": "<当前待分析的工具ID>",
- "knowledge_ids": ["<哪些传入的相关知识促成了这个能力,填入真实的知识ID>"],
- "capability_id": "NEW_<任意数字字母临时ID>",
- "name": "<总结提炼的新能力统称名>",
- "criterion": "<客观统一的判定标准>",
- "description": "<抽象的能力需求场景说明>",
- "implement_description": "<在该特定工具里是如何实现该能力的(具体操作或调用链)>"
- },
- {
- "action": "attach",
- "tool_id": "<当前待分析的工具ID>",
- "knowledge_ids": ["<关联的知识ID,可为空数组>"],
- "capability_id": "<来自下面【已有全量能力库】字典中完全等价功能的真实ID>",
- "implement_description": "<在该特定工具里实现此老能力的具体手法>"
- },
- {
- "action": "update_and_attach",
- "tool_id": "<当前待分析的工具ID>",
- "knowledge_ids": ["<关联的知识ID>"],
- "capability_id": "<来自【已有全量能力库】的真实ID>",
- "name": "<更统整包容的全局更好命名(如果不改就留空不变)>",
- "criterion": "<更新优化后的判定标准>",
- "description": "<更新优化后的描述>",
- "implement_description": "<在该特定工具中如何实现>"
- }
- ]
- 请绝对不要输出 markdown 包装,仅输出原生的合法 JSON。如果一个工具覆盖了多个独立原子能力,请为每个能力出具一条动作操作。
- """
- def _fetch_knowledge_map(cursor, k_ids: list):
- if not k_ids: return {}
- placeholders = ','.join(['%s'] * len(k_ids))
- cursor.execute(f"SELECT row_to_json(knowledge) as data FROM knowledge WHERE id IN ({placeholders})", list(k_ids))
- mapping = {}
- for r in cursor.fetchall():
- d = r['data']
- text = str(d.get('content', d.get('markdown', d.get('description', ''))))
- mapping[d['id']] = {"title": d.get('title', ''), "content": text[:4000]}
- return mapping
- async def extract_capabilities_with_claude(existing_caps, tool_batch, knowledge_map):
- cap_str = json.dumps([{"id": c["id"], "name": c["name"], "criterion": c.get("criterion", "")} for c in existing_caps], ensure_ascii=False)
- tool_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"], "docs": t["tutorial"], "associated_knowledge": t.get("knowledge_ids", [])} for t in tool_batch], ensure_ascii=False)
- knowledge_str = json.dumps(knowledge_map, ensure_ascii=False)
-
- prompt = f"【现有全量原子能力库字典】:\n{cap_str}\n\n【相关背景知识文章】:\n{knowledge_str}\n\n【本次待分析抽取合并的工具列表】:\n{tool_str}\n\n请严格输出JSON操作数组:"
-
- messages = [
- {"role": "system", "content": SYSTEM_PROMPT_CAPABILITY},
- {"role": "user", "content": prompt}
- ]
-
- result_text = ""
- try:
- result = await openrouter_llm_call(
- messages=messages,
- model="anthropic/claude-sonnet-4-5",
- temperature=0.2
- )
- result_text = result.get("content", "")
- except Exception as e:
- logger.error(f"OpenRouter API failed: {e}")
- return []
- with open("raw_capability_responses.log", "a", encoding="utf-8") as f:
- f.write(f"\n--- Synchronous Capability Batch Output ---\n{result_text}\n")
- try:
- clean_json = result_text.strip()
- if clean_json.startswith("```json"): clean_json = clean_json[7:]
- elif clean_json.startswith("```"): clean_json = clean_json[3:]
- if clean_json.endswith("```"): clean_json = clean_json[:-3]
-
- data = json.loads(clean_json.strip())
- if isinstance(data, dict):
- if "action" in data: return [data]
- return []
- elif isinstance(data, list):
- return [item for item in data if isinstance(item, dict) and "action" in item]
- return []
- except Exception as e:
- logger.error(f"Failed to parse capability JSON: {e}")
- return []
- @tool()
- async def sync_atomic_capabilities(target_tool_ids: List[str]) -> ToolResult:
- """
- 一键式强同步工具(为Librarian等智能体量身打造)。
- 针对新发现的或发生变动的特定工具/知识源,它能在数十秒内完成关联获取、大模型分析并直接完成底层 PostgreSQL 更新操作。
- 直接返回应用成功后的增减战报。
-
- Args:
- target_tool_ids: 必须提供。指定要被大模型执行能力审查提取的工具 ID 列表。建议每次传入数量极少(如 1-3个)以保证 15 秒内同步快速返回。
- """
- if not target_tool_ids:
- return ToolResult(title="❌ 参数错误", output="必须提供 target_tool_ids,独立系统不再允许发起全量全局扫描避免阻塞。", error="Missing target_tool_ids")
- logger.info(f"开启单通道同步能力萃取 (目标: {target_tool_ids})...")
-
- cap_store = PostgreSQLCapabilityStore()
- tool_store = PostgreSQLToolStore()
- k_cursor = cap_store._get_cursor()
- stats = {"created": 0, "attached": 0, "updated": 0, "knowledge_inherited": 0}
-
- try:
- existing_caps = cap_store.list_all(limit=5000)
- all_tools = tool_store.list_all(limit=2000)
-
- target_tools = [t for t in all_tools if t.get("id") in target_tool_ids]
- if not target_tools:
- return ToolResult(title="❌ 未找到工具", output=f"找不到任何由 {target_tool_ids} 制定的接入工具")
- # 拉取目标工具的强绑定相关知识
- batch_k_ids = set([k for t in target_tools for k in t.get("knowledge_ids", [])])
- k_map = _fetch_knowledge_map(k_cursor, list(batch_k_ids))
-
- # Claude 执行抽象推理构建矩阵
- ops = await extract_capabilities_with_claude(existing_caps, target_tools, k_map)
-
- if not ops:
- return ToolResult(title="ℹ️ 分析完成", output="大模型判定当前工具没有提取出任何有效或创新的功能资产。")
-
- temp_id_mapping = {}
- # 落地阶段一:先创造出新的原子能力
- for op in ops:
- if op.get("action") == "create" and op.get("capability_id") and op.get("tool_id"):
- real_id = f"cap-{uuid.uuid4().hex[:12]}"
- temp_id_mapping[op.get("capability_id")] = real_id
-
- t_id = op.get("tool_id")
- inherited_knowledge = op.get("knowledge_ids", [])
- stats["knowledge_inherited"] += len(inherited_knowledge)
-
- cap_store.insert_or_update({
- "id": real_id,
- "name": op.get("name", ""),
- "criterion": op.get("criterion", ""),
- "description": op.get("description", ""),
- "tool_ids": [t_id],
- "implements": {t_id: op.get("implement_description", "")},
- "knowledge_ids": inherited_knowledge
- })
- stats["created"] += 1
- # 落地阶段二:处理老能力的依附和扩展刷新
- for op in ops:
- action = op.get("action")
- if action in ("attach", "update_and_attach") and op.get("capability_id") and op.get("tool_id"):
- c_id = temp_id_mapping.get(op.get("capability_id"), op.get("capability_id"))
- existing_cap = cap_store.get_by_id(c_id)
- if not existing_cap: continue
-
- if action == "update_and_attach":
- existing_cap["name"] = op.get("name") or existing_cap.get("name")
- existing_cap["criterion"] = op.get("criterion") or existing_cap.get("criterion")
- existing_cap["description"] = op.get("description") or existing_cap.get("description")
- stats["updated"] += 1
-
- t_id = op.get("tool_id")
- imp_desc = op.get("implement_description", "")
-
- tool_ids = existing_cap.get("tool_ids", [])
- if t_id not in tool_ids: tool_ids.append(t_id)
- existing_cap["tool_ids"] = tool_ids
-
- implements = existing_cap.get("implements", {})
- implements[t_id] = imp_desc
- existing_cap["implements"] = implements
-
- op_k_ids = op.get("knowledge_ids", [])
- if op_k_ids:
- existing_k_ids = set(existing_cap.get("knowledge_ids", []))
- new_k_ids = [k for k in op_k_ids if k not in existing_k_ids]
- if new_k_ids:
- existing_k_ids.update(new_k_ids)
- existing_cap["knowledge_ids"] = list(existing_k_ids)
- stats["knowledge_inherited"] += len(new_k_ids)
-
- cap_store.insert_or_update(existing_cap)
- stats["attached"] += 1
- return ToolResult(
- title="✅ 强同步萃取完成",
- output=f"强同步萃取完毕并入库: 新生能力 {stats['created']}, 修缮扩写 {stats['updated']}, 同化挂载 {stats['attached']} (沿袭知识网脉络 {stats['knowledge_inherited']} 条).\n\n详情记录:\n" + json.dumps(ops, ensure_ascii=False, indent=2)
- )
- except Exception as e:
- logger.error(f"Sync capability extraction failed: {e}")
- cap_store.conn.rollback()
- return ToolResult(title="❌ 系统异常", output=f"执行时发生错误: {str(e)}", error=str(e))
- finally:
- k_cursor.close()
- cap_store.close()
- tool_store.close()
|