cache_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. """
  2. Knowledge Manager 本地缓存工具
  3. 负责:
  4. 1. 接收调研数据,存入本地缓存
  5. 2. 整理缓存数据(去重、合并、关联)
  6. 3. 可选提交到数据库或仅保存本地
  7. """
  8. import json
  9. import logging
  10. from pathlib import Path
  11. from typing import Dict, Any, List, Optional
  12. from datetime import datetime
  13. from agent.tools import tool, ToolResult
  14. logger = logging.getLogger(__name__)
  15. # 缓存目录
  16. CACHE_DIR = Path(".cache/.knowledge")
  17. BUFFER_DIR = CACHE_DIR / "buffer"
  18. ORGANIZED_DIR = CACHE_DIR / "organized"
  19. def _ensure_dirs():
  20. """确保缓存目录存在"""
  21. BUFFER_DIR.mkdir(parents=True, exist_ok=True)
  22. ORGANIZED_DIR.mkdir(parents=True, exist_ok=True)
  23. @tool(
  24. description=(
  25. "缓存调研数据到本地(不入库)。"
  26. "接受 JSON 字符串或字典,自动解析。"
  27. "适用于增量上传场景,先缓存后整理。"
  28. )
  29. )
  30. async def cache_research_data(
  31. data: str | Dict[str, Any],
  32. source: str = "unknown",
  33. ) -> ToolResult:
  34. """
  35. 缓存调研数据到本地(不入库)
  36. Args:
  37. data: 调研结果(JSON 字符串或字典),包含 tools/resources/knowledge
  38. source: 数据来源标识(如 agent_id)
  39. Returns:
  40. 缓存确认和统计
  41. Examples:
  42. # 方式 1:直接传字典
  43. cache_research_data(data={"knowledge": [...]}, source="agent_research")
  44. # 方式 2:传 JSON 字符串
  45. cache_research_data(data='{"knowledge": [...]}', source="agent_research")
  46. """
  47. try:
  48. _ensure_dirs()
  49. # 自动解析 JSON 字符串
  50. if isinstance(data, str):
  51. try:
  52. data = json.loads(data)
  53. except json.JSONDecodeError as e:
  54. return ToolResult(
  55. title="❌ JSON 解析失败",
  56. output=f"无法解析 JSON 字符串: {e}",
  57. error=str(e)
  58. )
  59. # 生成文件名
  60. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  61. filename = f"{source}_{timestamp}.json"
  62. filepath = BUFFER_DIR / filename
  63. # 写入缓存
  64. with open(filepath, "w", encoding="utf-8") as f:
  65. json.dump(data, f, ensure_ascii=False, indent=2)
  66. # 统计
  67. stats = []
  68. if data.get("tools"):
  69. stats.append(f"工具: {len(data['tools'])} 个")
  70. if data.get("resources"):
  71. stats.append(f"资源: {len(data['resources'])} 个")
  72. if data.get("knowledge"):
  73. stats.append(f"知识: {len(data['knowledge'])} 个")
  74. return ToolResult(
  75. title="✅ 已缓存到本地",
  76. output=f"文件: {filename}\n\n" + "\n".join(f"- {s}" for s in stats),
  77. metadata={"filepath": str(filepath), "stats": stats}
  78. )
  79. except Exception as e:
  80. logger.error(f"缓存失败: {e}")
  81. return ToolResult(
  82. title="❌ 缓存失败",
  83. output=f"错误: {str(e)}",
  84. error=str(e)
  85. )
  86. @tool()
  87. async def organize_cached_data(
  88. merge: bool = True,
  89. ) -> ToolResult:
  90. """
  91. 整理缓存数据(去重、合并)
  92. Args:
  93. merge: 是否合并所有缓存文件
  94. Returns:
  95. 整理后的数据统计
  96. """
  97. try:
  98. _ensure_dirs()
  99. # 读取所有缓存文件
  100. buffer_files = list(BUFFER_DIR.glob("*.json"))
  101. if not buffer_files:
  102. return ToolResult(
  103. title="ℹ️ 无缓存数据",
  104. output="buffer 目录为空"
  105. )
  106. all_tools = []
  107. all_resources = []
  108. all_knowledge = []
  109. for filepath in buffer_files:
  110. with open(filepath, "r", encoding="utf-8") as f:
  111. data = json.load(f)
  112. all_tools.extend(data.get("tools", []))
  113. all_resources.extend(data.get("resources", []))
  114. all_knowledge.extend(data.get("knowledge", []))
  115. # 去重(基于名称/标题)
  116. def dedupe_by_key(items: List[Dict], key: str) -> List[Dict]:
  117. seen = set()
  118. result = []
  119. for item in items:
  120. identifier = item.get(key)
  121. if identifier and identifier not in seen:
  122. seen.add(identifier)
  123. result.append(item)
  124. return result
  125. tools_deduped = dedupe_by_key(all_tools, "名称")
  126. resources_deduped = dedupe_by_key(all_resources, "标题")
  127. knowledge_deduped = dedupe_by_key(all_knowledge, "内容")
  128. # 保存整理后的数据
  129. organized_data = {
  130. "tools": tools_deduped,
  131. "resources": resources_deduped,
  132. "knowledge": knowledge_deduped,
  133. "organized_at": datetime.now().isoformat(),
  134. "source_files": [f.name for f in buffer_files]
  135. }
  136. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  137. organized_file = ORGANIZED_DIR / f"organized_{timestamp}.json"
  138. with open(organized_file, "w", encoding="utf-8") as f:
  139. json.dump(organized_data, f, ensure_ascii=False, indent=2)
  140. # 清空 buffer(可选)
  141. if merge:
  142. for filepath in buffer_files:
  143. filepath.unlink()
  144. stats = [
  145. f"工具: {len(all_tools)} → {len(tools_deduped)} (去重 {len(all_tools) - len(tools_deduped)})",
  146. f"资源: {len(all_resources)} → {len(resources_deduped)} (去重 {len(all_resources) - len(resources_deduped)})",
  147. f"知识: {len(all_knowledge)} → {len(knowledge_deduped)} (去重 {len(all_knowledge) - len(knowledge_deduped)})",
  148. ]
  149. return ToolResult(
  150. title="✅ 整理完成",
  151. output=f"文件: {organized_file.name}\n\n" + "\n".join(f"- {s}" for s in stats),
  152. metadata={"filepath": str(organized_file), "stats": organized_data}
  153. )
  154. except Exception as e:
  155. logger.error(f"整理失败: {e}")
  156. return ToolResult(
  157. title="❌ 整理失败",
  158. output=f"错误: {str(e)}",
  159. error=str(e)
  160. )
  161. @tool()
  162. async def commit_to_database(
  163. organized_file: Optional[str] = None,
  164. ) -> ToolResult:
  165. """
  166. 将整理后的数据提交到数据库,建立完整的关联关系
  167. Args:
  168. organized_file: 指定要提交的文件(不指定则提交最新的)
  169. Returns:
  170. 提交结果统计(按知识类型分组)
  171. """
  172. try:
  173. _ensure_dirs()
  174. # 找到要提交的文件
  175. if organized_file:
  176. filepath = ORGANIZED_DIR / organized_file
  177. else:
  178. organized_files = sorted(ORGANIZED_DIR.glob("organized_*.json"))
  179. if not organized_files:
  180. return ToolResult(
  181. title="ℹ️ 无整理数据",
  182. output="organized 目录为空,请先调用 organize_cached_data"
  183. )
  184. filepath = organized_files[-1]
  185. if not filepath.exists():
  186. return ToolResult(
  187. title="❌ 文件不存在",
  188. output=f"文件: {filepath.name}"
  189. )
  190. # 读取数据
  191. with open(filepath, "r", encoding="utf-8") as f:
  192. data = json.load(f)
  193. # 导入数据库工具
  194. from agent.tools.builtin.knowledge import resource_save, knowledge_save
  195. # 统计变量
  196. saved_tools = 0
  197. saved_resources = 0
  198. knowledge_by_type = {"tool": 0, "strategy": 0, "case": 0, "experience": 0}
  199. errors = []
  200. # 映射:resource_id -> knowledge_ids(用于反向关联)
  201. resource_to_knowledge = {}
  202. # 第一步:保存资源
  203. for resource in data.get("resources", []):
  204. try:
  205. resource_id = resource.get("id", f"resource_{saved_resources}")
  206. await resource_save(
  207. resource_id=resource_id,
  208. title=resource.get("标题", ""),
  209. body=resource.get("内容", ""),
  210. content_type=resource.get("类型", "text"),
  211. metadata=resource.get("元数据", {})
  212. )
  213. saved_resources += 1
  214. resource_to_knowledge[resource_id] = []
  215. except Exception as e:
  216. errors.append(f"资源 {resource.get('标题')}: {e}")
  217. # 第二步:保存知识,建立 resource_ids 关联
  218. for knowledge in data.get("knowledge", []):
  219. try:
  220. # 提取关联的 resource_ids
  221. resource_ids = knowledge.get("resource_ids", [])
  222. # 保存知识
  223. result = await knowledge_save(
  224. task=knowledge.get("主题", ""),
  225. content=knowledge.get("内容", ""),
  226. types=knowledge.get("类型", []),
  227. tags=knowledge.get("标签", {}),
  228. resource_ids=resource_ids,
  229. )
  230. # 统计知识类型
  231. types = knowledge.get("类型", [])
  232. for t in types:
  233. if t in knowledge_by_type:
  234. knowledge_by_type[t] += 1
  235. # 提取 knowledge_id,用于反向关联
  236. knowledge_id = result.metadata.get("knowledge_id")
  237. if knowledge_id:
  238. for rid in resource_ids:
  239. if rid in resource_to_knowledge:
  240. resource_to_knowledge[rid].append(knowledge_id)
  241. except Exception as e:
  242. errors.append(f"知识 {knowledge.get('主题')}: {e}")
  243. # 第三步:保存工具(作为 resource,类型为 tool)
  244. for tool in data.get("tools", []):
  245. try:
  246. tool_id = f"tools/{tool.get('分类', 'misc')}/{tool.get('名称', 'unknown')}"
  247. await resource_save(
  248. resource_id=tool_id,
  249. title=tool.get("名称", ""),
  250. body=json.dumps(tool, ensure_ascii=False, indent=2),
  251. content_type="tool",
  252. metadata={
  253. "category": tool.get("分类", ""),
  254. "introduction": tool.get("简介", ""),
  255. **tool.get("工具信息", {})
  256. }
  257. )
  258. saved_tools += 1
  259. except Exception as e:
  260. errors.append(f"工具 {tool.get('名称')}: {e}")
  261. # 构建统计输出
  262. stats_lines = [
  263. "**knowledge 表**:",
  264. f"- 工具知识: {knowledge_by_type['tool']} 条",
  265. f"- 工序知识: {knowledge_by_type['strategy']} 条",
  266. f"- 用例知识: {knowledge_by_type['case']} 条",
  267. f"- 执行经验: {knowledge_by_type['experience']} 条",
  268. "",
  269. "**resources 表**:",
  270. f"- 资源: {saved_resources} 个",
  271. f"- 工具: {saved_tools} 个",
  272. ]
  273. if errors:
  274. stats_lines.append(f"\n**错误**: {len(errors)} 个")
  275. output = "已提交到数据库\n\n" + "\n".join(stats_lines)
  276. if errors:
  277. output += "\n\n错误详情:\n" + "\n".join(f"- {e}" for e in errors[:5])
  278. return ToolResult(
  279. title="✅ 提交到数据库完成",
  280. output=output,
  281. metadata={
  282. "saved": {
  283. "tools": saved_tools,
  284. "resources": saved_resources,
  285. "knowledge": knowledge_by_type
  286. }
  287. }
  288. )
  289. except Exception as e:
  290. logger.error(f"提交到数据库失败: {e}")
  291. return ToolResult(
  292. title="❌ 提交失败",
  293. output=f"错误: {str(e)}",
  294. error=str(e)
  295. )
  296. @tool(
  297. description=(
  298. "查看缓存状态,包括 buffer 和 organized 中的文件列表、统计信息。"
  299. "用于了解当前有多少数据在缓存中等待处理。"
  300. )
  301. )
  302. async def list_cache_status() -> ToolResult:
  303. """
  304. 查看缓存状态(buffer 和 organized 中的文件)
  305. Returns:
  306. 缓存目录中的文件列表和统计
  307. """
  308. _ensure_dirs()
  309. buffer_files = sorted(BUFFER_DIR.glob("*.json"))
  310. organized_files = sorted(ORGANIZED_DIR.glob("*.json"))
  311. lines = [f"**Buffer** ({len(buffer_files)} 个文件):"]
  312. total_tools = 0
  313. total_resources = 0
  314. total_knowledge = 0
  315. for f in buffer_files:
  316. size = f.stat().st_size
  317. # 读取文件统计
  318. try:
  319. with open(f, "r", encoding="utf-8") as file:
  320. data = json.load(file)
  321. tools = len(data.get("tools", []))
  322. resources = len(data.get("resources", []))
  323. knowledge = len(data.get("knowledge", []))
  324. total_tools += tools
  325. total_resources += resources
  326. total_knowledge += knowledge
  327. lines.append(
  328. f" - {f.name} ({size // 1024}KB): "
  329. f"工具 {tools}, 资源 {resources}, 知识 {knowledge}"
  330. )
  331. except Exception:
  332. lines.append(f" - {f.name} ({size // 1024}KB)")
  333. if buffer_files:
  334. lines.append(
  335. f"\n **合计**: 工具 {total_tools}, 资源 {total_resources}, 知识 {total_knowledge}"
  336. )
  337. lines.append(f"\n**Organized** ({len(organized_files)} 个文件):")
  338. for f in organized_files:
  339. size = f.stat().st_size
  340. lines.append(f" - {f.name} ({size // 1024}KB)")
  341. return ToolResult(
  342. title="📁 缓存状态",
  343. output="\n".join(lines),
  344. metadata={
  345. "files": [f.name for f in buffer_files],
  346. "total": {
  347. "tools": total_tools,
  348. "resources": total_resources,
  349. "knowledge": total_knowledge
  350. }
  351. }
  352. )