""" Knowledge Manager 本地缓存工具 负责: 1. 接收调研数据,存入本地缓存 2. 整理缓存数据(去重、合并、关联) 3. 可选提交到数据库或仅保存本地 """ import json import logging from pathlib import Path from typing import Dict, Any, List, Optional from datetime import datetime from agent.tools import tool, ToolResult logger = logging.getLogger(__name__) # 缓存目录 CACHE_DIR = Path(".cache/.knowledge") BUFFER_DIR = CACHE_DIR / "buffer" ORGANIZED_DIR = CACHE_DIR / "organized" def _ensure_dirs(): """确保缓存目录存在""" BUFFER_DIR.mkdir(parents=True, exist_ok=True) ORGANIZED_DIR.mkdir(parents=True, exist_ok=True) @tool( description=( "缓存调研数据到本地(不入库)。" "接受 JSON 字符串或字典,自动解析。" "适用于增量上传场景,先缓存后整理。" ) ) async def cache_research_data( data: str | Dict[str, Any], source: str = "unknown", ) -> ToolResult: """ 缓存调研数据到本地(不入库) Args: data: 调研结果(JSON 字符串或字典),包含 tools/resources/knowledge source: 数据来源标识(如 agent_id) Returns: 缓存确认和统计 Examples: # 方式 1:直接传字典 cache_research_data(data={"knowledge": [...]}, source="agent_research") # 方式 2:传 JSON 字符串 cache_research_data(data='{"knowledge": [...]}', source="agent_research") """ try: _ensure_dirs() # 自动解析 JSON 字符串 if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError as e: return ToolResult( title="❌ JSON 解析失败", output=f"无法解析 JSON 字符串: {e}", error=str(e) ) # 生成文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{source}_{timestamp}.json" filepath = BUFFER_DIR / filename # 写入缓存 with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) # 统计 stats = [] if data.get("tools"): stats.append(f"工具: {len(data['tools'])} 个") if data.get("resources"): stats.append(f"资源: {len(data['resources'])} 个") if data.get("knowledge"): stats.append(f"知识: {len(data['knowledge'])} 个") return ToolResult( title="✅ 已缓存到本地", output=f"文件: {filename}\n\n" + "\n".join(f"- {s}" for s in stats), metadata={"filepath": str(filepath), "stats": stats} ) except Exception as e: logger.error(f"缓存失败: {e}") return ToolResult( title="❌ 缓存失败", output=f"错误: {str(e)}", error=str(e) ) @tool() async def organize_cached_data( merge: bool = True, ) -> ToolResult: """ 整理缓存数据(去重、合并) Args: merge: 是否合并所有缓存文件 Returns: 整理后的数据统计 """ try: _ensure_dirs() # 读取所有缓存文件 buffer_files = list(BUFFER_DIR.glob("*.json")) if not buffer_files: return ToolResult( title="ℹ️ 无缓存数据", output="buffer 目录为空" ) all_tools = [] all_resources = [] all_knowledge = [] for filepath in buffer_files: with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) all_tools.extend(data.get("tools", [])) all_resources.extend(data.get("resources", [])) all_knowledge.extend(data.get("knowledge", [])) # 去重(基于名称/标题) def dedupe_by_key(items: List[Dict], key: str) -> List[Dict]: seen = set() result = [] for item in items: identifier = item.get(key) if identifier and identifier not in seen: seen.add(identifier) result.append(item) return result tools_deduped = dedupe_by_key(all_tools, "名称") resources_deduped = dedupe_by_key(all_resources, "标题") knowledge_deduped = dedupe_by_key(all_knowledge, "内容") # 保存整理后的数据 organized_data = { "tools": tools_deduped, "resources": resources_deduped, "knowledge": knowledge_deduped, "organized_at": datetime.now().isoformat(), "source_files": [f.name for f in buffer_files] } timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") organized_file = ORGANIZED_DIR / f"organized_{timestamp}.json" with open(organized_file, "w", encoding="utf-8") as f: json.dump(organized_data, f, ensure_ascii=False, indent=2) # 清空 buffer(可选) if merge: for filepath in buffer_files: filepath.unlink() stats = [ f"工具: {len(all_tools)} → {len(tools_deduped)} (去重 {len(all_tools) - len(tools_deduped)})", f"资源: {len(all_resources)} → {len(resources_deduped)} (去重 {len(all_resources) - len(resources_deduped)})", f"知识: {len(all_knowledge)} → {len(knowledge_deduped)} (去重 {len(all_knowledge) - len(knowledge_deduped)})", ] return ToolResult( title="✅ 整理完成", output=f"文件: {organized_file.name}\n\n" + "\n".join(f"- {s}" for s in stats), metadata={"filepath": str(organized_file), "stats": organized_data} ) except Exception as e: logger.error(f"整理失败: {e}") return ToolResult( title="❌ 整理失败", output=f"错误: {str(e)}", error=str(e) ) @tool() async def commit_to_database( organized_file: Optional[str] = None, ) -> ToolResult: """ 将整理后的数据提交到数据库,建立完整的关联关系 Args: organized_file: 指定要提交的文件(不指定则提交最新的) Returns: 提交结果统计(按知识类型分组) """ try: _ensure_dirs() # 找到要提交的文件 if organized_file: filepath = ORGANIZED_DIR / organized_file else: organized_files = sorted(ORGANIZED_DIR.glob("organized_*.json")) if not organized_files: return ToolResult( title="ℹ️ 无整理数据", output="organized 目录为空,请先调用 organize_cached_data" ) filepath = organized_files[-1] if not filepath.exists(): return ToolResult( title="❌ 文件不存在", output=f"文件: {filepath.name}" ) # 读取数据 with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) # 导入数据库工具 from agent.tools.builtin.knowledge import resource_save, knowledge_save # 统计变量 saved_tools = 0 saved_resources = 0 knowledge_by_type = {"tool": 0, "strategy": 0, "case": 0, "experience": 0} errors = [] # 映射:resource_id -> knowledge_ids(用于反向关联) resource_to_knowledge = {} # 第一步:保存资源 for resource in data.get("resources", []): try: resource_id = resource.get("id", f"resource_{saved_resources}") await resource_save( resource_id=resource_id, title=resource.get("标题", ""), body=resource.get("内容", ""), content_type=resource.get("类型", "text"), metadata=resource.get("元数据", {}) ) saved_resources += 1 resource_to_knowledge[resource_id] = [] except Exception as e: errors.append(f"资源 {resource.get('标题')}: {e}") # 第二步:保存知识,建立 resource_ids 关联 for knowledge in data.get("knowledge", []): try: # 提取关联的 resource_ids resource_ids = knowledge.get("resource_ids", []) # 保存知识 result = await knowledge_save( task=knowledge.get("主题", ""), content=knowledge.get("内容", ""), types=knowledge.get("类型", []), tags=knowledge.get("标签", {}), resource_ids=resource_ids, ) # 统计知识类型 types = knowledge.get("类型", []) for t in types: if t in knowledge_by_type: knowledge_by_type[t] += 1 # 提取 knowledge_id,用于反向关联 knowledge_id = result.metadata.get("knowledge_id") if knowledge_id: for rid in resource_ids: if rid in resource_to_knowledge: resource_to_knowledge[rid].append(knowledge_id) except Exception as e: errors.append(f"知识 {knowledge.get('主题')}: {e}") # 第三步:保存工具(作为 resource,类型为 tool) for tool in data.get("tools", []): try: tool_id = f"tools/{tool.get('分类', 'misc')}/{tool.get('名称', 'unknown')}" await resource_save( resource_id=tool_id, title=tool.get("名称", ""), body=json.dumps(tool, ensure_ascii=False, indent=2), content_type="tool", metadata={ "category": tool.get("分类", ""), "introduction": tool.get("简介", ""), **tool.get("工具信息", {}) } ) saved_tools += 1 except Exception as e: errors.append(f"工具 {tool.get('名称')}: {e}") # 构建统计输出 stats_lines = [ "**knowledge 表**:", f"- 工具知识: {knowledge_by_type['tool']} 条", f"- 工序知识: {knowledge_by_type['strategy']} 条", f"- 用例知识: {knowledge_by_type['case']} 条", f"- 执行经验: {knowledge_by_type['experience']} 条", "", "**resources 表**:", f"- 资源: {saved_resources} 个", f"- 工具: {saved_tools} 个", ] if errors: stats_lines.append(f"\n**错误**: {len(errors)} 个") output = "已提交到数据库\n\n" + "\n".join(stats_lines) if errors: output += "\n\n错误详情:\n" + "\n".join(f"- {e}" for e in errors[:5]) return ToolResult( title="✅ 提交到数据库完成", output=output, metadata={ "saved": { "tools": saved_tools, "resources": saved_resources, "knowledge": knowledge_by_type } } ) except Exception as e: logger.error(f"提交到数据库失败: {e}") return ToolResult( title="❌ 提交失败", output=f"错误: {str(e)}", error=str(e) ) @tool( description=( "查看缓存状态,包括 buffer 和 organized 中的文件列表、统计信息。" "用于了解当前有多少数据在缓存中等待处理。" ) ) async def list_cache_status() -> ToolResult: """ 查看缓存状态(buffer 和 organized 中的文件) Returns: 缓存目录中的文件列表和统计 """ _ensure_dirs() buffer_files = sorted(BUFFER_DIR.glob("*.json")) organized_files = sorted(ORGANIZED_DIR.glob("*.json")) lines = [f"**Buffer** ({len(buffer_files)} 个文件):"] total_tools = 0 total_resources = 0 total_knowledge = 0 for f in buffer_files: size = f.stat().st_size # 读取文件统计 try: with open(f, "r", encoding="utf-8") as file: data = json.load(file) tools = len(data.get("tools", [])) resources = len(data.get("resources", [])) knowledge = len(data.get("knowledge", [])) total_tools += tools total_resources += resources total_knowledge += knowledge lines.append( f" - {f.name} ({size // 1024}KB): " f"工具 {tools}, 资源 {resources}, 知识 {knowledge}" ) except Exception: lines.append(f" - {f.name} ({size // 1024}KB)") if buffer_files: lines.append( f"\n **合计**: 工具 {total_tools}, 资源 {total_resources}, 知识 {total_knowledge}" ) lines.append(f"\n**Organized** ({len(organized_files)} 个文件):") for f in organized_files: size = f.stat().st_size lines.append(f" - {f.name} ({size // 1024}KB)") return ToolResult( title="📁 缓存状态", output="\n".join(lines), metadata={ "files": [f.name for f in buffer_files], "total": { "tools": total_tools, "resources": total_resources, "knowledge": total_knowledge } } )