| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- """
- Knowledge Manager 本地缓存与整理工具
- 负责:
- 1. 维护本地的 pre_upload_list.json 图谱草稿
- 2. 提供 commit_to_database 将草稿分发入库(Requirements, Capabilities, Tools, Knowledge)
- """
- import os
- import json
- import logging
- import httpx
- from pathlib import Path
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- from agent.tools import tool, ToolResult
- logger = logging.getLogger(__name__)
- # 缓存目录
- CACHE_DIR = Path(".cache/.knowledge")
- PRE_UPLOAD_FILE = CACHE_DIR / "pre_upload_list.json"
- def _ensure_dirs():
- """确保缓存目录存在"""
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- @tool()
- async def organize_cached_data(merge: bool = True) -> ToolResult:
- """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
- return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
- @tool()
- async def cache_research_data(data: str | Dict[str, Any], source: str = "unknown") -> ToolResult:
- """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
- return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
- @tool(
- description=(
- "将本地维护好的 pre_upload_list.json 预上传图谱草稿,"
- "分发提交到远端真实的 Requirements、Capabilities、Tools、Knowledge 数据表中。"
- )
- )
- async def commit_to_database() -> ToolResult:
- """
- 提交预处理好的知识图谱到数据库(涵盖 Requirements, Capabilities, Tools, Knowledge)。
- 从 .cache/.knowledge/pre_upload_list.json 读取。
-
- Returns:
- 提交结果和统计信息
- """
- try:
- if not PRE_UPLOAD_FILE.exists():
- return ToolResult(
- title="⚠️ 无草稿可提交",
- output=f"未找到预处理草稿文件:{PRE_UPLOAD_FILE}"
- )
- with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
- data = json.load(f)
- reqs = data.get("requirements", [])
- caps = data.get("capabilities", [])
- tools_list = data.get("tools", [])
- knowledges = data.get("knowledge", [])
- api_base = os.environ.get("KNOWHUB_API", "http://localhost:9999")
- saved_reqs, saved_caps, saved_tools, saved_knows = 0, 0, 0, 0
- errors = []
- async with httpx.AsyncClient(timeout=30.0) as client:
- # 1. 提交 Requirements
- for r in reqs:
- try:
- res = await client.post(f"{api_base}/api/requirement", json=r)
- res.raise_for_status()
- saved_reqs += 1
- except Exception as e:
- errors.append(f"提交需求失败 {r.get('id', '')}: {e}")
- # 2. 提交 Capabilities
- for c in caps:
- try:
- res = await client.post(f"{api_base}/api/capability", json=c)
- res.raise_for_status()
- saved_caps += 1
- except Exception as e:
- errors.append(f"提交能力失败 {c.get('id', '')}: {e}")
- # 3. 提交 Tools
- for t in tools_list:
- try:
- res = await client.post(f"{api_base}/api/tool", json=t)
- res.raise_for_status()
- saved_tools += 1
- except Exception as e:
- errors.append(f"提交工具失败 {t.get('id', '')}: {e}")
- # 4. 提交 Knowledge
- from agent.tools.builtin.knowledge import knowledge_save
- for k in knowledges:
- try:
- await knowledge_save(
- task=k.get("task", "补充知识"),
- content=k.get("content", ""),
- types=k.get("types", []),
- score=k.get("score", 3),
- tools=k.get("tools", []),
- support_capability=k.get("support_capability", []),
- source_category=k.get("source", {}).get("category", "exp")
- )
- saved_knows += 1
- except Exception as e:
- errors.append(f"提交知识失败: {e}")
- # 若完全没有错误,清空草稿
- if not errors:
- PRE_UPLOAD_FILE.unlink(missing_ok=True)
- output = (f"已成功将图谱发往数据库流水线。\n"
- f"- 写入 Requirement: {saved_reqs} 个\n"
- f"- 写入 Capability: {saved_caps} 个\n"
- f"- 写入 Tool: {saved_tools} 个\n"
- f"- 写入 Knowledge: {saved_knows} 条\n")
- if errors:
- output += "\n伴随部分异常:\n" + "\n".join(f"- {e}" for e in errors[:5])
- return ToolResult(
- title="✅ 提交入库完成",
- output=output
- )
- except Exception as e:
- logger.error(f"提交到数据库失败: {e}")
- return ToolResult(
- title="❌ 提交失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(
- description="查看当前预整理草稿的统计信息。"
- )
- async def list_cache_status() -> ToolResult:
- """
- 查看草稿状态(pre_upload_list.json)
- """
- _ensure_dirs()
- if not PRE_UPLOAD_FILE.exists():
- return ToolResult(title="ℹ️ 暂无草稿", output="pre_upload_list.json 不存在。")
- try:
- with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- reqs = len(data.get("requirements", []))
- caps = len(data.get("capabilities", []))
- tools = len(data.get("tools", []))
- knows = len(data.get("knowledge", []))
- output = (f"当前草稿数据({PRE_UPLOAD_FILE.name}):\n"
- f" - 需求: {reqs}\n"
- f" - 能力: {caps}\n"
- f" - 工具: {tools}\n"
- f" - 知识: {knows}\n")
- return ToolResult(title="📁 草稿状态", output=output)
- except Exception as e:
- return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))
|