""" Knowledge Manager 本地缓存与整理工具 负责: 1. 维护本地的 pre_upload_list.json 图谱草稿 2. 提供 commit_to_database 将草稿分发入库(Requirements, Capabilities, Tools, Knowledge) """ import os import json import logging import httpx from pathlib import Path from typing import Dict, Any, List, Optional from datetime import datetime from agent.tools import tool, ToolResult logger = logging.getLogger(__name__) # 缓存目录 CACHE_DIR = Path(".cache/.knowledge") PRE_UPLOAD_FILE = CACHE_DIR / "pre_upload_list.json" def _ensure_dirs(): """确保缓存目录存在""" CACHE_DIR.mkdir(parents=True, exist_ok=True) @tool() async def organize_cached_data(merge: bool = True) -> ToolResult: """为了兼容旧指令保留。现在实际上不再需要独立调用。""" return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。") @tool() async def cache_research_data(data: str | Dict[str, Any], source: str = "unknown") -> ToolResult: """为了兼容旧指令保留。现在实际上不再需要独立调用。""" return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。") @tool( description=( "将本地维护好的 pre_upload_list.json 预上传图谱草稿," "分发提交到远端真实的 Requirements、Capabilities、Tools、Knowledge 数据表中。" ) ) async def commit_to_database() -> ToolResult: """ 提交预处理好的知识图谱到数据库(涵盖 Requirements, Capabilities, Tools, Knowledge)。 从 .cache/.knowledge/pre_upload_list.json 读取。 Returns: 提交结果和统计信息 """ try: if not PRE_UPLOAD_FILE.exists(): return ToolResult( title="⚠️ 无草稿可提交", output=f"未找到预处理草稿文件:{PRE_UPLOAD_FILE}" ) with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f: data = json.load(f) reqs = data.get("requirements", []) caps = data.get("capabilities", []) tools_list = data.get("tools", []) knowledges = data.get("knowledge", []) api_base = os.environ.get("KNOWHUB_API", "http://localhost:9999") saved_reqs, saved_caps, saved_tools, saved_knows = 0, 0, 0, 0 errors = [] async with httpx.AsyncClient(timeout=30.0) as client: # 1. 提交 Requirements for r in reqs: try: res = await client.post(f"{api_base}/api/requirement", json=r) res.raise_for_status() saved_reqs += 1 except Exception as e: errors.append(f"提交需求失败 {r.get('id', '')}: {e}") # 2. 提交 Capabilities(仅包含经过严格验证的条目) for c in caps: try: res = await client.post(f"{api_base}/api/capability", json=c) res.raise_for_status() saved_caps += 1 except Exception as e: errors.append(f"提交能力失败 {c.get('id', '')}: {e}") # 3. 提交 Tools for t in tools_list: try: res = await client.post(f"{api_base}/api/tool", json=t) res.raise_for_status() saved_tools += 1 except Exception as e: errors.append(f"提交工具失败 {t.get('id', '')}: {e}") # 4. 提交 Knowledge from agent.tools.builtin.knowledge import knowledge_save for k in knowledges: try: await knowledge_save( task=k.get("task", "补充知识"), content=k.get("content", ""), types=k.get("types", []), score=k.get("score", 3), tools=k.get("tools", []), support_capability=k.get("support_capability", []), source_category=k.get("source", {}).get("category", "exp") ) saved_knows += 1 except Exception as e: errors.append(f"提交知识失败: {e}") # 若完全没有错误,清空草稿 if not errors: PRE_UPLOAD_FILE.unlink(missing_ok=True) output = (f"已成功将图谱发往数据库流水线。\n" f"- 写入 Requirement: {saved_reqs} 个\n" f"- 写入 Capability: {saved_caps} 个\n" f"- 写入 Tool: {saved_tools} 个\n" f"- 写入 Knowledge: {saved_knows} 条\n") if errors: output += "\n伴随部分异常:\n" + "\n".join(f"- {e}" for e in errors[:5]) return ToolResult( title="✅ 提交入库完成", output=output ) except Exception as e: logger.error(f"提交到数据库失败: {e}") return ToolResult( title="❌ 提交失败", output=f"错误: {str(e)}", error=str(e) ) @tool( description="查看当前预整理草稿的统计信息。" ) async def list_cache_status() -> ToolResult: """ 查看草稿状态(pre_upload_list.json) """ _ensure_dirs() if not PRE_UPLOAD_FILE.exists(): return ToolResult(title="ℹ️ 暂无草稿", output="pre_upload_list.json 不存在。") try: with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f: data = json.load(f) reqs = len(data.get("requirements", [])) caps = len(data.get("capabilities", [])) tools = len(data.get("tools", [])) knows = len(data.get("knowledge", [])) output = (f"当前草稿数据({PRE_UPLOAD_FILE.name}):\n" f" - 需求: {reqs}\n" f" - 能力: {caps}\n" f" - 工具: {tools}\n" f" - 知识: {knows}\n") return ToolResult(title="📁 草稿状态", output=output) except Exception as e: return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))