| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- """
- Knowledge Manager 本地缓存与整理工具
- 负责:
- 1. 维护本地的 pre_upload_list.json 图谱草稿
- 2. 提供 commit_to_database 将草稿分发入库(Requirements, Capabilities, Tools, Knowledge)
- """
- import os
- import json
- import logging
- import httpx
- from pathlib import Path
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- from agent.tools import tool, ToolResult
- logger = logging.getLogger(__name__)
- # 缓存目录
- CACHE_DIR = Path(".cache/.knowledge")
- PRE_UPLOAD_FILE = CACHE_DIR / "pre_upload_list.json"
- def _ensure_dirs():
- """确保缓存目录存在"""
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- @tool()
- async def organize_cached_data(merge: bool = True) -> ToolResult:
- """旧指令保留口。现在已废弃,无需调用。"""
- return ToolResult(title="ℹ️ 提示", output="请直接使用 cache_research_data。")
- @tool()
- async def cache_research_data(entity_type: str, data: Dict[str, Any]) -> ToolResult:
- """
- 【极端重要】由于通过文本级读写组装极庞大的嵌套 JSON 文件很容易导致大模型截断、忘闭合引发毁灭性覆盖或奔溃,
- 任何要在 JSON 缓存中「安全追加」一条信息的操作请仅限调用此工具!不要使用 write_file!
-
- Args:
- entity_type: 所属的数据类别,仅能填入 "requirements", "capabilities", "tools" 或是 "knowledge"。
- data: 具体那单独一条你想草拟或记录的数据实体结构(请直接传递 Json Object, 我们会在底层完成拼接保存)。
-
- Returns:
- 缓存操作的执行结果
- """
- _ensure_dirs()
- if entity_type not in ("requirements", "capabilities", "tools", "knowledge"):
- return ToolResult(
- title="❌ 参数异常",
- output=f"传入的 entity_type = {entity_type} 不合法。必须是 requirements, capabilities, tools, knowledge。请重新确认参数类型!",
- error="Invalid entity_type"
- )
-
- # 0. 数据格式硬校验 (Schema Validation)
- if entity_type == "knowledge":
- if "task" not in data:
- return ToolResult(
- title="❌ 校验失败",
- output="【严重错误】写入 knowledge 必须包含 'task' 字段!不能用 'title' 代替。请修正 JSON 结构后重新调用本工具。",
- error="Missing 'task' field"
- )
- if "tags" in data and not isinstance(data["tags"], dict):
- return ToolResult(
- title="❌ 校验失败",
- output="【严重错误】knowledge 的 'tags' 字段强制要求为字典格式 (如 `{\"标签名\": \"\"}`),绝对不能是数组(List)。请修正后重新调用本工具。",
- error="Invalid 'tags' format"
- )
- elif entity_type == "capabilities":
- if "name" not in data or "description" not in data:
- return ToolResult(
- title="❌ 校验失败",
- output="写入 capabilities 必须包含 'name' 和 'description' 字段。请修正后再调用。",
- error="Missing capability fields"
- )
- elif entity_type == "requirements":
- if "description" not in data:
- return ToolResult(
- title="❌ 校验失败",
- output="写入 requirements 必须包含 'description' 字段。请修正后再调用。",
- error="Missing requirement fields"
- )
-
- try:
- # 1. 内存层安全读取
- if PRE_UPLOAD_FILE.exists():
- with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
- try:
- cache_dict = json.load(f)
- except json.JSONDecodeError:
- # 如果原文件损坏,进行挽救性备份并重新初始化
- backup_file = CACHE_DIR / f"pre_upload_list_backup_{int(datetime.now().timestamp())}.json"
- os.rename(PRE_UPLOAD_FILE, backup_file)
- cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
- else:
- cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
-
- # 2. 追加
- if entity_type not in cache_dict:
- cache_dict[entity_type] = []
-
- # 去重更新与追加 (以 ID 为准)
- data_id = data.get("id")
- replaced = False
- if data_id:
- for idx, existing in enumerate(cache_dict[entity_type]):
- if existing.get("id") == data_id:
- cache_dict[entity_type][idx] = data
- replaced = True
- break
- if not replaced:
- cache_dict[entity_type].append(data)
-
- # 3. 稳妥写盘
- with open(PRE_UPLOAD_FILE, "w", encoding="utf-8") as f:
- json.dump(cache_dict, f, ensure_ascii=False, indent=2)
-
- action = "更新" if replaced else "新建"
- return ToolResult(title="✅ 存入草稿箱成功", output=f"成功将一条 {entity_type} {action}写入到了缓存文件!当前此类别规模: {len(cache_dict[entity_type])} 个。")
-
- except Exception as e:
- logger.error(f"Cache save failed: {e}")
- return ToolResult(title="❌ 系统异常", output=f"执行时发生底层错误: {str(e)}", error=str(e))
- @tool(
- description=(
- "将本地维护好的 pre_upload_list.json 预上传图谱草稿,"
- "分发提交到远端真实的 Requirements、Capabilities、Tools、Knowledge 数据表中。"
- )
- )
- async def commit_to_database() -> ToolResult:
- """
- 提交预处理好的知识图谱到数据库(涵盖 Requirements, Capabilities, Tools, Knowledge)。
- 从 .cache/.knowledge/pre_upload_list.json 读取。
-
- Returns:
- 提交结果和统计信息
- """
- try:
- if not PRE_UPLOAD_FILE.exists():
- return ToolResult(
- title="⚠️ 无草稿可提交",
- output=f"未找到预处理草稿文件:{PRE_UPLOAD_FILE}"
- )
- with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
- data = json.load(f)
- reqs = data.get("requirements", [])
- caps = data.get("capabilities", [])
- tools_list = data.get("tools", [])
- knowledges = data.get("knowledge", [])
- api_base = os.environ.get("KNOWHUB_API", "http://localhost:9999")
- saved_reqs, saved_caps, saved_tools, saved_knows = 0, 0, 0, 0
- errors = []
- async with httpx.AsyncClient(timeout=30.0) as client:
- # 1. 提交 Requirements
- for r in reqs:
- try:
- res = await client.post(f"{api_base}/api/requirement", json=r)
- res.raise_for_status()
- saved_reqs += 1
- except Exception as e:
- errors.append(f"提交需求失败 {r.get('id', '')}: {e}")
- # 2. 提交 Capabilities(仅包含经过严格验证的条目)
- for c in caps:
- try:
- res = await client.post(f"{api_base}/api/capability", json=c)
- res.raise_for_status()
- saved_caps += 1
- except Exception as e:
- errors.append(f"提交能力失败 {c.get('id', '')}: {e}")
- # 3. 提交 Tools
- for t in tools_list:
- try:
- res = await client.post(f"{api_base}/api/tool", json=t)
- res.raise_for_status()
- saved_tools += 1
- except Exception as e:
- errors.append(f"提交工具失败 {t.get('id', '')}: {e}")
- # 4. 提交 Knowledge
- from agent.tools.builtin.knowledge import knowledge_save
- for k in knowledges:
- try:
- raw_tags = k.get("tags", {})
- if isinstance(raw_tags, list):
- raw_tags = {str(item): "" for item in raw_tags}
-
- await knowledge_save(
- task=k.get("task", k.get("title", "补充知识")),
- content=k.get("content", ""),
- types=k.get("types", []),
- score=k.get("score", 3),
- source_category=k.get("source", {}).get("category", "exp"),
- capability_ids=k.get("capability_ids", []),
- tool_ids=k.get("tool_ids", []),
- tags=raw_tags
- )
- saved_knows += 1
- except Exception as e:
- errors.append(f"提交知识失败: {e}")
- # 若完全没有错误,清空草稿
- if not errors:
- PRE_UPLOAD_FILE.unlink(missing_ok=True)
- output = (f"已成功将图谱发往数据库流水线。\n"
- f"- 写入 Requirement: {saved_reqs} 个\n"
- f"- 写入 Capability: {saved_caps} 个\n"
- f"- 写入 Tool: {saved_tools} 个\n"
- f"- 写入 Knowledge: {saved_knows} 条\n")
- if errors:
- output += "\n伴随部分异常:\n" + "\n".join(f"- {e}" for e in errors[:5])
- return ToolResult(
- title="✅ 提交入库完成",
- output=output
- )
- except Exception as e:
- logger.error(f"提交到数据库失败: {e}")
- return ToolResult(
- title="❌ 提交失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(
- description="查看当前预整理草稿的统计明细,或者通过传入 entity_id 获取某条特定草稿的完整 JSON 详情。"
- )
- async def list_cache_status(entity_id: Optional[str] = None) -> ToolResult:
- """
- 查看草稿状态(pre_upload_list.json)或特定记录的详情。
- Args:
- entity_id: (可选) 如果传入实体 ID(如 'REQ_001'),将返回该条目的完整草稿详情。不传则仅列出概要。
- """
- _ensure_dirs()
- if not PRE_UPLOAD_FILE.exists():
- return ToolResult(title="ℹ️ 暂无草稿", output="pre_upload_list.json 不存在。")
- try:
- with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- if entity_id:
- # 查找具体详情
- for group in ("requirements", "capabilities", "tools", "knowledge"):
- for item in data.get(group, []):
- if item.get("id") == entity_id:
- return ToolResult(
- title=f"📄 草稿详情: {entity_id}",
- output=json.dumps(item, ensure_ascii=False, indent=2)
- )
- return ToolResult(title="⚠️ 找不到对象", output=f"在草稿箱中未找到 ID 为 {entity_id} 的实体。")
-
- # 仅返回统计明细
- reqs = data.get("requirements", [])
- caps = data.get("capabilities", [])
- tools = data.get("tools", [])
- knows = data.get("knowledge", [])
- output_lines = [f"当前草稿数据({PRE_UPLOAD_FILE.name}):(通过传参 entity_id 查看具体完整JSON)"]
-
- output_lines.append(f"\n- 需求 ({len(reqs)}条):")
- for r in reqs: output_lines.append(f" • [{r.get('id')}] {r.get('description', '')[:50]}...")
-
- output_lines.append(f"\n- 能力 ({len(caps)}条):")
- for c in caps: output_lines.append(f" • [{c.get('id')}] {c.get('name', '')}")
-
- output_lines.append(f"\n- 工具 ({len(tools)}条):")
- for t in tools: output_lines.append(f" • [{t.get('id')}] {t.get('name', '')}")
-
- output_lines.append(f"\n- 知识 ({len(knows)}条):")
- for k in knows: output_lines.append(f" • [{k.get('id')}] {k.get('title', '')[:50]}...")
- return ToolResult(title="📁 草稿状态明细", output="\n".join(output_lines))
- except Exception as e:
- return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))
|