capability_extractor.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import json
  2. import logging
  3. import uuid
  4. import os
  5. import asyncio
  6. from typing import List
  7. from agent.tools import tool, ToolResult
  8. from agent.llm.openrouter import openrouter_llm_call
  9. # 导入底层 Postgres 资产表依赖
  10. from tool_agent.tool.tool_store import PostgreSQLToolStore
  11. from tool_agent.tool.capability import PostgreSQLCapabilityStore
  12. logger = logging.getLogger(__name__)
  13. SYSTEM_PROMPT_CAPABILITY = """你是一个专业的能力分析师。
  14. 你的任务是从给定的【待分析新工具】的使用介绍和它挂载的【相关背景知识文章】中,提取出它对整网【原子能力表】的贡献,并选择新建或是融合。
  15. ## 定义与格式
  16. 1. 原子能力是面向需求、跨工具的独立完整业务单元。
  17. 2. 端到端型工具(如Midjourney)直接抽取能力;编排平台工具(如ComfyUI节点群)从实际搭建的工作流中提取能力视角(不要原子化平台或独立节点本身)。
  18. 请输出严格的 JSON 数组结构:
  19. [
  20. {
  21. "action": "create",
  22. "tool_id": "<当前待分析的工具ID>",
  23. "knowledge_ids": ["<哪些传入的相关知识促成了这个能力,填入真实的知识ID>"],
  24. "capability_id": "NEW_<任意数字字母临时ID>",
  25. "name": "<总结提炼的新能力统称名>",
  26. "criterion": "<客观统一的判定标准>",
  27. "description": "<抽象的能力需求场景说明>",
  28. "implement_description": "<在该特定工具里是如何实现该能力的(具体操作或调用链)>"
  29. },
  30. {
  31. "action": "attach",
  32. "tool_id": "<当前待分析的工具ID>",
  33. "knowledge_ids": ["<关联的知识ID,可为空数组>"],
  34. "capability_id": "<来自下面【已有全量能力库】字典中完全等价功能的真实ID>",
  35. "implement_description": "<在该特定工具里实现此老能力的具体手法>"
  36. },
  37. {
  38. "action": "update_and_attach",
  39. "tool_id": "<当前待分析的工具ID>",
  40. "knowledge_ids": ["<关联的知识ID>"],
  41. "capability_id": "<来自【已有全量能力库】的真实ID>",
  42. "name": "<更统整包容的全局更好命名(如果不改就留空不变)>",
  43. "criterion": "<更新优化后的判定标准>",
  44. "description": "<更新优化后的描述>",
  45. "implement_description": "<在该特定工具中如何实现>"
  46. }
  47. ]
  48. 请绝对不要输出 markdown 包装,仅输出原生的合法 JSON。如果一个工具覆盖了多个独立原子能力,请为每个能力出具一条动作操作。
  49. """
  50. def _fetch_knowledge_map(cursor, k_ids: list):
  51. if not k_ids: return {}
  52. placeholders = ','.join(['%s'] * len(k_ids))
  53. cursor.execute(f"SELECT row_to_json(knowledge) as data FROM knowledge WHERE id IN ({placeholders})", list(k_ids))
  54. mapping = {}
  55. for r in cursor.fetchall():
  56. d = r['data']
  57. text = str(d.get('content', d.get('markdown', d.get('description', ''))))
  58. mapping[d['id']] = {"title": d.get('title', ''), "content": text[:4000]}
  59. return mapping
  60. async def extract_capabilities_with_claude(existing_caps, tool_batch, knowledge_map):
  61. cap_str = json.dumps([{"id": c["id"], "name": c["name"], "criterion": c.get("criterion", "")} for c in existing_caps], ensure_ascii=False)
  62. tool_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"], "docs": t["tutorial"], "associated_knowledge": t.get("knowledge_ids", [])} for t in tool_batch], ensure_ascii=False)
  63. knowledge_str = json.dumps(knowledge_map, ensure_ascii=False)
  64. prompt = f"【现有全量原子能力库字典】:\n{cap_str}\n\n【相关背景知识文章】:\n{knowledge_str}\n\n【本次待分析抽取合并的工具列表】:\n{tool_str}\n\n请严格输出JSON操作数组:"
  65. messages = [
  66. {"role": "system", "content": SYSTEM_PROMPT_CAPABILITY},
  67. {"role": "user", "content": prompt}
  68. ]
  69. result_text = ""
  70. try:
  71. result = await openrouter_llm_call(
  72. messages=messages,
  73. model="anthropic/claude-sonnet-4-5",
  74. temperature=0.2
  75. )
  76. result_text = result.get("content", "")
  77. except Exception as e:
  78. logger.error(f"OpenRouter API failed: {e}")
  79. return []
  80. with open("raw_capability_responses.log", "a", encoding="utf-8") as f:
  81. f.write(f"\n--- Synchronous Capability Batch Output ---\n{result_text}\n")
  82. try:
  83. clean_json = result_text.strip()
  84. if clean_json.startswith("```json"): clean_json = clean_json[7:]
  85. elif clean_json.startswith("```"): clean_json = clean_json[3:]
  86. if clean_json.endswith("```"): clean_json = clean_json[:-3]
  87. data = json.loads(clean_json.strip())
  88. if isinstance(data, dict):
  89. if "action" in data: return [data]
  90. return []
  91. elif isinstance(data, list):
  92. return [item for item in data if isinstance(item, dict) and "action" in item]
  93. return []
  94. except Exception as e:
  95. logger.error(f"Failed to parse capability JSON: {e}")
  96. return []
  97. @tool()
  98. async def sync_atomic_capabilities(target_tool_ids: List[str]) -> ToolResult:
  99. """
  100. 一键式强同步工具(为Librarian等智能体量身打造)。
  101. 针对新发现的或发生变动的特定工具/知识源,它能在数十秒内完成关联获取、大模型分析并直接完成底层 PostgreSQL 更新操作。
  102. 直接返回应用成功后的增减战报。
  103. Args:
  104. target_tool_ids: 必须提供。指定要被大模型执行能力审查提取的工具 ID 列表。建议每次传入数量极少(如 1-3个)以保证 15 秒内同步快速返回。
  105. """
  106. if not target_tool_ids:
  107. return ToolResult(title="❌ 参数错误", output="必须提供 target_tool_ids,独立系统不再允许发起全量全局扫描避免阻塞。", error="Missing target_tool_ids")
  108. logger.info(f"开启单通道同步能力萃取 (目标: {target_tool_ids})...")
  109. cap_store = PostgreSQLCapabilityStore()
  110. tool_store = PostgreSQLToolStore()
  111. k_cursor = cap_store._get_cursor()
  112. stats = {"created": 0, "attached": 0, "updated": 0, "knowledge_inherited": 0}
  113. try:
  114. existing_caps = cap_store.list_all(limit=5000)
  115. all_tools = tool_store.list_all(limit=2000)
  116. target_tools = [t for t in all_tools if t.get("id") in target_tool_ids]
  117. if not target_tools:
  118. return ToolResult(title="❌ 未找到工具", output=f"找不到任何由 {target_tool_ids} 制定的接入工具")
  119. # 拉取目标工具的强绑定相关知识
  120. batch_k_ids = set([k for t in target_tools for k in t.get("knowledge_ids", [])])
  121. k_map = _fetch_knowledge_map(k_cursor, list(batch_k_ids))
  122. # Claude 执行抽象推理构建矩阵
  123. ops = await extract_capabilities_with_claude(existing_caps, target_tools, k_map)
  124. if not ops:
  125. return ToolResult(title="ℹ️ 分析完成", output="大模型判定当前工具没有提取出任何有效或创新的功能资产。")
  126. temp_id_mapping = {}
  127. # 落地阶段一:先创造出新的原子能力
  128. for op in ops:
  129. if op.get("action") == "create" and op.get("capability_id") and op.get("tool_id"):
  130. real_id = f"cap-{uuid.uuid4().hex[:12]}"
  131. temp_id_mapping[op.get("capability_id")] = real_id
  132. t_id = op.get("tool_id")
  133. inherited_knowledge = op.get("knowledge_ids", [])
  134. stats["knowledge_inherited"] += len(inherited_knowledge)
  135. cap_store.insert_or_update({
  136. "id": real_id,
  137. "name": op.get("name", ""),
  138. "criterion": op.get("criterion", ""),
  139. "description": op.get("description", ""),
  140. "tool_ids": [t_id],
  141. "implements": {t_id: op.get("implement_description", "")},
  142. "knowledge_ids": inherited_knowledge
  143. })
  144. stats["created"] += 1
  145. # 落地阶段二:处理老能力的依附和扩展刷新
  146. for op in ops:
  147. action = op.get("action")
  148. if action in ("attach", "update_and_attach") and op.get("capability_id") and op.get("tool_id"):
  149. c_id = temp_id_mapping.get(op.get("capability_id"), op.get("capability_id"))
  150. existing_cap = cap_store.get_by_id(c_id)
  151. if not existing_cap: continue
  152. if action == "update_and_attach":
  153. existing_cap["name"] = op.get("name") or existing_cap.get("name")
  154. existing_cap["criterion"] = op.get("criterion") or existing_cap.get("criterion")
  155. existing_cap["description"] = op.get("description") or existing_cap.get("description")
  156. stats["updated"] += 1
  157. t_id = op.get("tool_id")
  158. imp_desc = op.get("implement_description", "")
  159. tool_ids = existing_cap.get("tool_ids", [])
  160. if t_id not in tool_ids: tool_ids.append(t_id)
  161. existing_cap["tool_ids"] = tool_ids
  162. implements = existing_cap.get("implements", {})
  163. implements[t_id] = imp_desc
  164. existing_cap["implements"] = implements
  165. op_k_ids = op.get("knowledge_ids", [])
  166. if op_k_ids:
  167. existing_k_ids = set(existing_cap.get("knowledge_ids", []))
  168. new_k_ids = [k for k in op_k_ids if k not in existing_k_ids]
  169. if new_k_ids:
  170. existing_k_ids.update(new_k_ids)
  171. existing_cap["knowledge_ids"] = list(existing_k_ids)
  172. stats["knowledge_inherited"] += len(new_k_ids)
  173. cap_store.insert_or_update(existing_cap)
  174. stats["attached"] += 1
  175. return ToolResult(
  176. title="✅ 强同步萃取完成",
  177. output=f"强同步萃取完毕并入库: 新生能力 {stats['created']}, 修缮扩写 {stats['updated']}, 同化挂载 {stats['attached']} (沿袭知识网脉络 {stats['knowledge_inherited']} 条).\n\n详情记录:\n" + json.dumps(ops, ensure_ascii=False, indent=2)
  178. )
  179. except Exception as e:
  180. logger.error(f"Sync capability extraction failed: {e}")
  181. cap_store.conn.rollback()
  182. return ToolResult(title="❌ 系统异常", output=f"执行时发生错误: {str(e)}", error=str(e))
  183. finally:
  184. k_cursor.close()
  185. cap_store.close()
  186. tool_store.close()