cache_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. """
  2. Knowledge Manager 本地缓存与整理工具
  3. 负责:
  4. 1. 维护本地的 pre_upload_list.json 图谱草稿
  5. 2. 提供 commit_to_database 将草稿分发入库(Requirements, Capabilities, Tools, Knowledge)
  6. """
  7. import os
  8. import json
  9. import logging
  10. import httpx
  11. from pathlib import Path
  12. from typing import Dict, Any, List, Optional
  13. from datetime import datetime
  14. from agent.tools import tool, ToolResult
  15. logger = logging.getLogger(__name__)
  16. # 缓存目录
  17. CACHE_DIR = Path(".cache/.knowledge")
  18. PRE_UPLOAD_FILE = CACHE_DIR / "pre_upload_list.json"
  19. def _ensure_dirs():
  20. """确保缓存目录存在"""
  21. CACHE_DIR.mkdir(parents=True, exist_ok=True)
  22. @tool()
  23. async def organize_cached_data(merge: bool = True) -> ToolResult:
  24. """旧指令保留口。现在已废弃,无需调用。"""
  25. return ToolResult(title="ℹ️ 提示", output="请直接使用 cache_research_data。")
  26. @tool()
  27. async def cache_research_data(entity_type: str, data: Dict[str, Any]) -> ToolResult:
  28. """
  29. 【极端重要】由于通过文本级读写组装极庞大的嵌套 JSON 文件很容易导致大模型截断、忘闭合引发毁灭性覆盖或奔溃,
  30. 任何要在 JSON 缓存中「安全追加」一条信息的操作请仅限调用此工具!不要使用 write_file!
  31. Args:
  32. entity_type: 所属的数据类别,仅能填入 "requirements", "capabilities", "tools" 或是 "knowledge"。
  33. data: 具体那单独一条你想草拟或记录的数据实体结构(请直接传递 Json Object, 我们会在底层完成拼接保存)。
  34. Returns:
  35. 缓存操作的执行结果
  36. """
  37. _ensure_dirs()
  38. if entity_type not in ("requirements", "capabilities", "tools", "knowledge"):
  39. return ToolResult(
  40. title="❌ 参数异常",
  41. output=f"传入的 entity_type = {entity_type} 不合法。必须是 requirements, capabilities, tools, knowledge。请重新确认参数类型!",
  42. error="Invalid entity_type"
  43. )
  44. # 0. 数据格式硬校验 (Schema Validation)
  45. if entity_type == "knowledge":
  46. if "task" not in data:
  47. return ToolResult(
  48. title="❌ 校验失败",
  49. output="【严重错误】写入 knowledge 必须包含 'task' 字段!不能用 'title' 代替。请修正 JSON 结构后重新调用本工具。",
  50. error="Missing 'task' field"
  51. )
  52. if "tags" in data and not isinstance(data["tags"], dict):
  53. return ToolResult(
  54. title="❌ 校验失败",
  55. output="【严重错误】knowledge 的 'tags' 字段强制要求为字典格式 (如 `{\"标签名\": \"\"}`),绝对不能是数组(List)。请修正后重新调用本工具。",
  56. error="Invalid 'tags' format"
  57. )
  58. elif entity_type == "capabilities":
  59. if "name" not in data or "description" not in data:
  60. return ToolResult(
  61. title="❌ 校验失败",
  62. output="写入 capabilities 必须包含 'name' 和 'description' 字段。请修正后再调用。",
  63. error="Missing capability fields"
  64. )
  65. elif entity_type == "requirements":
  66. if "description" not in data:
  67. return ToolResult(
  68. title="❌ 校验失败",
  69. output="写入 requirements 必须包含 'description' 字段。请修正后再调用。",
  70. error="Missing requirement fields"
  71. )
  72. try:
  73. # 1. 内存层安全读取
  74. if PRE_UPLOAD_FILE.exists():
  75. with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
  76. try:
  77. cache_dict = json.load(f)
  78. except json.JSONDecodeError:
  79. # 如果原文件损坏,进行挽救性备份并重新初始化
  80. backup_file = CACHE_DIR / f"pre_upload_list_backup_{int(datetime.now().timestamp())}.json"
  81. os.rename(PRE_UPLOAD_FILE, backup_file)
  82. cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
  83. else:
  84. cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
  85. # 2. 追加
  86. if entity_type not in cache_dict:
  87. cache_dict[entity_type] = []
  88. # 去重更新与追加 (以 ID 为准)
  89. data_id = data.get("id")
  90. replaced = False
  91. if data_id:
  92. for idx, existing in enumerate(cache_dict[entity_type]):
  93. if existing.get("id") == data_id:
  94. cache_dict[entity_type][idx] = data
  95. replaced = True
  96. break
  97. if not replaced:
  98. cache_dict[entity_type].append(data)
  99. # 3. 稳妥写盘
  100. with open(PRE_UPLOAD_FILE, "w", encoding="utf-8") as f:
  101. json.dump(cache_dict, f, ensure_ascii=False, indent=2)
  102. action = "更新" if replaced else "新建"
  103. return ToolResult(title="✅ 存入草稿箱成功", output=f"成功将一条 {entity_type} {action}写入到了缓存文件!当前此类别规模: {len(cache_dict[entity_type])} 个。")
  104. except Exception as e:
  105. logger.error(f"Cache save failed: {e}")
  106. return ToolResult(title="❌ 系统异常", output=f"执行时发生底层错误: {str(e)}", error=str(e))
  107. @tool(
  108. description=(
  109. "将本地维护好的 pre_upload_list.json 预上传图谱草稿,"
  110. "分发提交到远端真实的 Requirements、Capabilities、Tools、Knowledge 数据表中。"
  111. )
  112. )
  113. async def commit_to_database() -> ToolResult:
  114. """
  115. 提交预处理好的知识图谱到数据库(涵盖 Requirements, Capabilities, Tools, Knowledge)。
  116. 从 .cache/.knowledge/pre_upload_list.json 读取。
  117. Returns:
  118. 提交结果和统计信息
  119. """
  120. try:
  121. if not PRE_UPLOAD_FILE.exists():
  122. return ToolResult(
  123. title="⚠️ 无草稿可提交",
  124. output=f"未找到预处理草稿文件:{PRE_UPLOAD_FILE}"
  125. )
  126. with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
  127. data = json.load(f)
  128. reqs = data.get("requirements", [])
  129. caps = data.get("capabilities", [])
  130. tools_list = data.get("tools", [])
  131. knowledges = data.get("knowledge", [])
  132. api_base = os.environ.get("KNOWHUB_API", "http://localhost:9999")
  133. saved_reqs, saved_caps, saved_tools, saved_knows = 0, 0, 0, 0
  134. errors = []
  135. async with httpx.AsyncClient(timeout=30.0) as client:
  136. # 1. 提交 Requirements
  137. for r in reqs:
  138. try:
  139. res = await client.post(f"{api_base}/api/requirement", json=r)
  140. res.raise_for_status()
  141. saved_reqs += 1
  142. except Exception as e:
  143. errors.append(f"提交需求失败 {r.get('id', '')}: {e}")
  144. # 2. 提交 Capabilities(仅包含经过严格验证的条目)
  145. for c in caps:
  146. try:
  147. res = await client.post(f"{api_base}/api/capability", json=c)
  148. res.raise_for_status()
  149. saved_caps += 1
  150. except Exception as e:
  151. errors.append(f"提交能力失败 {c.get('id', '')}: {e}")
  152. # 3. 提交 Tools
  153. for t in tools_list:
  154. try:
  155. res = await client.post(f"{api_base}/api/tool", json=t)
  156. res.raise_for_status()
  157. saved_tools += 1
  158. except Exception as e:
  159. errors.append(f"提交工具失败 {t.get('id', '')}: {e}")
  160. # 4. 提交 Knowledge
  161. from agent.tools.builtin.knowledge import knowledge_save
  162. for k in knowledges:
  163. try:
  164. raw_tags = k.get("tags", {})
  165. if isinstance(raw_tags, list):
  166. raw_tags = {str(item): "" for item in raw_tags}
  167. await knowledge_save(
  168. task=k.get("task", k.get("title", "补充知识")),
  169. content=k.get("content", ""),
  170. types=k.get("types", []),
  171. score=k.get("score", 3),
  172. source_category=k.get("source", {}).get("category", "exp"),
  173. capability_ids=k.get("capability_ids", []),
  174. tool_ids=k.get("tool_ids", []),
  175. tags=raw_tags
  176. )
  177. saved_knows += 1
  178. except Exception as e:
  179. errors.append(f"提交知识失败: {e}")
  180. # 若完全没有错误,清空草稿
  181. if not errors:
  182. PRE_UPLOAD_FILE.unlink(missing_ok=True)
  183. output = (f"已成功将图谱发往数据库流水线。\n"
  184. f"- 写入 Requirement: {saved_reqs} 个\n"
  185. f"- 写入 Capability: {saved_caps} 个\n"
  186. f"- 写入 Tool: {saved_tools} 个\n"
  187. f"- 写入 Knowledge: {saved_knows} 条\n")
  188. if errors:
  189. output += "\n伴随部分异常:\n" + "\n".join(f"- {e}" for e in errors[:5])
  190. return ToolResult(
  191. title="✅ 提交入库完成",
  192. output=output
  193. )
  194. except Exception as e:
  195. logger.error(f"提交到数据库失败: {e}")
  196. return ToolResult(
  197. title="❌ 提交失败",
  198. output=f"错误: {str(e)}",
  199. error=str(e)
  200. )
  201. @tool(
  202. description="查看当前预整理草稿的统计明细,或者通过传入 entity_id 获取某条特定草稿的完整 JSON 详情。"
  203. )
  204. async def list_cache_status(entity_id: Optional[str] = None) -> ToolResult:
  205. """
  206. 查看草稿状态(pre_upload_list.json)或特定记录的详情。
  207. Args:
  208. entity_id: (可选) 如果传入实体 ID(如 'REQ_001'),将返回该条目的完整草稿详情。不传则仅列出概要。
  209. """
  210. _ensure_dirs()
  211. if not PRE_UPLOAD_FILE.exists():
  212. return ToolResult(title="ℹ️ 暂无草稿", output="pre_upload_list.json 不存在。")
  213. try:
  214. with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
  215. data = json.load(f)
  216. if entity_id:
  217. # 查找具体详情
  218. for group in ("requirements", "capabilities", "tools", "knowledge"):
  219. for item in data.get(group, []):
  220. if item.get("id") == entity_id:
  221. return ToolResult(
  222. title=f"📄 草稿详情: {entity_id}",
  223. output=json.dumps(item, ensure_ascii=False, indent=2)
  224. )
  225. return ToolResult(title="⚠️ 找不到对象", output=f"在草稿箱中未找到 ID 为 {entity_id} 的实体。")
  226. # 仅返回统计明细
  227. reqs = data.get("requirements", [])
  228. caps = data.get("capabilities", [])
  229. tools = data.get("tools", [])
  230. knows = data.get("knowledge", [])
  231. output_lines = [f"当前草稿数据({PRE_UPLOAD_FILE.name}):(通过传参 entity_id 查看具体完整JSON)"]
  232. output_lines.append(f"\n- 需求 ({len(reqs)}条):")
  233. for r in reqs: output_lines.append(f" • [{r.get('id')}] {r.get('description', '')[:50]}...")
  234. output_lines.append(f"\n- 能力 ({len(caps)}条):")
  235. for c in caps: output_lines.append(f" • [{c.get('id')}] {c.get('name', '')}")
  236. output_lines.append(f"\n- 工具 ({len(tools)}条):")
  237. for t in tools: output_lines.append(f" • [{t.get('id')}] {t.get('name', '')}")
  238. output_lines.append(f"\n- 知识 ({len(knows)}条):")
  239. for k in knows: output_lines.append(f" • [{k.get('id')}] {k.get('title', '')[:50]}...")
  240. return ToolResult(title="📁 草稿状态明细", output="\n".join(output_lines))
  241. except Exception as e:
  242. return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))