cache_manager.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """
  2. Knowledge Manager 本地缓存与整理工具
  3. 负责:
  4. 1. 维护本地的 pre_upload_list.json 图谱草稿
  5. 2. 提供 commit_to_database 将草稿分发入库(Requirements, Capabilities, Tools, Knowledge)
  6. """
  7. import os
  8. import json
  9. import logging
  10. import httpx
  11. from pathlib import Path
  12. from typing import Dict, Any, List, Optional
  13. from datetime import datetime
  14. from agent.tools import tool, ToolResult
  15. logger = logging.getLogger(__name__)
  16. # 缓存目录
  17. CACHE_DIR = Path(".cache/.knowledge")
  18. PRE_UPLOAD_FILE = CACHE_DIR / "pre_upload_list.json"
  19. def _ensure_dirs():
  20. """确保缓存目录存在"""
  21. CACHE_DIR.mkdir(parents=True, exist_ok=True)
  22. @tool()
  23. async def organize_cached_data(merge: bool = True) -> ToolResult:
  24. """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
  25. return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
  26. @tool()
  27. async def cache_research_data(data: str | Dict[str, Any], source: str = "unknown") -> ToolResult:
  28. """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
  29. return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
  30. @tool(
  31. description=(
  32. "将本地维护好的 pre_upload_list.json 预上传图谱草稿,"
  33. "分发提交到远端真实的 Requirements、Capabilities、Tools、Knowledge 数据表中。"
  34. )
  35. )
  36. async def commit_to_database() -> ToolResult:
  37. """
  38. 提交预处理好的知识图谱到数据库(涵盖 Requirements, Capabilities, Tools, Knowledge)。
  39. 从 .cache/.knowledge/pre_upload_list.json 读取。
  40. Returns:
  41. 提交结果和统计信息
  42. """
  43. try:
  44. if not PRE_UPLOAD_FILE.exists():
  45. return ToolResult(
  46. title="⚠️ 无草稿可提交",
  47. output=f"未找到预处理草稿文件:{PRE_UPLOAD_FILE}"
  48. )
  49. with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
  50. data = json.load(f)
  51. reqs = data.get("requirements", [])
  52. caps = data.get("capabilities", [])
  53. tools_list = data.get("tools", [])
  54. knowledges = data.get("knowledge", [])
  55. api_base = os.environ.get("KNOWHUB_API", "http://localhost:9999")
  56. saved_reqs, saved_caps, saved_tools, saved_knows = 0, 0, 0, 0
  57. errors = []
  58. async with httpx.AsyncClient(timeout=30.0) as client:
  59. # 1. 提交 Requirements
  60. for r in reqs:
  61. try:
  62. res = await client.post(f"{api_base}/api/requirement", json=r)
  63. res.raise_for_status()
  64. saved_reqs += 1
  65. except Exception as e:
  66. errors.append(f"提交需求失败 {r.get('id', '')}: {e}")
  67. # 2. 提交 Capabilities(仅包含经过严格验证的条目)
  68. for c in caps:
  69. try:
  70. res = await client.post(f"{api_base}/api/capability", json=c)
  71. res.raise_for_status()
  72. saved_caps += 1
  73. except Exception as e:
  74. errors.append(f"提交能力失败 {c.get('id', '')}: {e}")
  75. # 3. 提交 Tools
  76. for t in tools_list:
  77. try:
  78. res = await client.post(f"{api_base}/api/tool", json=t)
  79. res.raise_for_status()
  80. saved_tools += 1
  81. except Exception as e:
  82. errors.append(f"提交工具失败 {t.get('id', '')}: {e}")
  83. # 4. 提交 Knowledge
  84. from agent.tools.builtin.knowledge import knowledge_save
  85. for k in knowledges:
  86. try:
  87. await knowledge_save(
  88. task=k.get("task", "补充知识"),
  89. content=k.get("content", ""),
  90. types=k.get("types", []),
  91. score=k.get("score", 3),
  92. tools=k.get("tools", []),
  93. support_capability=k.get("support_capability", []),
  94. source_category=k.get("source", {}).get("category", "exp")
  95. )
  96. saved_knows += 1
  97. except Exception as e:
  98. errors.append(f"提交知识失败: {e}")
  99. # 若完全没有错误,清空草稿
  100. if not errors:
  101. PRE_UPLOAD_FILE.unlink(missing_ok=True)
  102. output = (f"已成功将图谱发往数据库流水线。\n"
  103. f"- 写入 Requirement: {saved_reqs} 个\n"
  104. f"- 写入 Capability: {saved_caps} 个\n"
  105. f"- 写入 Tool: {saved_tools} 个\n"
  106. f"- 写入 Knowledge: {saved_knows} 条\n")
  107. if errors:
  108. output += "\n伴随部分异常:\n" + "\n".join(f"- {e}" for e in errors[:5])
  109. return ToolResult(
  110. title="✅ 提交入库完成",
  111. output=output
  112. )
  113. except Exception as e:
  114. logger.error(f"提交到数据库失败: {e}")
  115. return ToolResult(
  116. title="❌ 提交失败",
  117. output=f"错误: {str(e)}",
  118. error=str(e)
  119. )
  120. @tool(
  121. description="查看当前预整理草稿的统计信息。"
  122. )
  123. async def list_cache_status() -> ToolResult:
  124. """
  125. 查看草稿状态(pre_upload_list.json)
  126. """
  127. _ensure_dirs()
  128. if not PRE_UPLOAD_FILE.exists():
  129. return ToolResult(title="ℹ️ 暂无草稿", output="pre_upload_list.json 不存在。")
  130. try:
  131. with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
  132. data = json.load(f)
  133. reqs = len(data.get("requirements", []))
  134. caps = len(data.get("capabilities", []))
  135. tools = len(data.get("tools", []))
  136. knows = len(data.get("knowledge", []))
  137. output = (f"当前草稿数据({PRE_UPLOAD_FILE.name}):\n"
  138. f" - 需求: {reqs}\n"
  139. f" - 能力: {caps}\n"
  140. f" - 工具: {tools}\n"
  141. f" - 知识: {knows}\n")
  142. return ToolResult(title="📁 草稿状态", output=output)
  143. except Exception as e:
  144. return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))