| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- """
- Knowledge Manager 本地缓存工具
- 负责:
- 1. 接收调研数据,存入本地缓存
- 2. 整理缓存数据(去重、合并、关联)
- 3. 可选提交到数据库或仅保存本地
- """
- import json
- import logging
- from pathlib import Path
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- from agent.tools import tool, ToolResult
- logger = logging.getLogger(__name__)
- # 缓存目录
- CACHE_DIR = Path(".cache/.knowledge")
- BUFFER_DIR = CACHE_DIR / "buffer"
- ORGANIZED_DIR = CACHE_DIR / "organized"
- def _ensure_dirs():
- """确保缓存目录存在"""
- BUFFER_DIR.mkdir(parents=True, exist_ok=True)
- ORGANIZED_DIR.mkdir(parents=True, exist_ok=True)
- @tool(
- description=(
- "缓存调研数据到本地(不入库)。"
- "接受 JSON 字符串或字典,自动解析。"
- "适用于增量上传场景,先缓存后整理。"
- )
- )
- async def cache_research_data(
- data: str | Dict[str, Any],
- source: str = "unknown",
- ) -> ToolResult:
- """
- 缓存调研数据到本地(不入库)
- Args:
- data: 调研结果(JSON 字符串或字典),包含 tools/resources/knowledge
- source: 数据来源标识(如 agent_id)
- Returns:
- 缓存确认和统计
- Examples:
- # 方式 1:直接传字典
- cache_research_data(data={"knowledge": [...]}, source="agent_research")
- # 方式 2:传 JSON 字符串
- cache_research_data(data='{"knowledge": [...]}', source="agent_research")
- """
- try:
- _ensure_dirs()
- # 自动解析 JSON 字符串
- if isinstance(data, str):
- try:
- data = json.loads(data)
- except json.JSONDecodeError as e:
- return ToolResult(
- title="❌ JSON 解析失败",
- output=f"无法解析 JSON 字符串: {e}",
- error=str(e)
- )
- # 生成文件名
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{source}_{timestamp}.json"
- filepath = BUFFER_DIR / filename
- # 写入缓存
- with open(filepath, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- # 统计
- stats = []
- if data.get("tools"):
- stats.append(f"工具: {len(data['tools'])} 个")
- if data.get("resources"):
- stats.append(f"资源: {len(data['resources'])} 个")
- if data.get("knowledge"):
- stats.append(f"知识: {len(data['knowledge'])} 个")
- return ToolResult(
- title="✅ 已缓存到本地",
- output=f"文件: {filename}\n\n" + "\n".join(f"- {s}" for s in stats),
- metadata={"filepath": str(filepath), "stats": stats}
- )
- except Exception as e:
- logger.error(f"缓存失败: {e}")
- return ToolResult(
- title="❌ 缓存失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def organize_cached_data(
- merge: bool = True,
- ) -> ToolResult:
- """
- 整理缓存数据(去重、合并)
- Args:
- merge: 是否合并所有缓存文件
- Returns:
- 整理后的数据统计
- """
- try:
- _ensure_dirs()
- # 读取所有缓存文件
- buffer_files = list(BUFFER_DIR.glob("*.json"))
- if not buffer_files:
- return ToolResult(
- title="ℹ️ 无缓存数据",
- output="buffer 目录为空"
- )
- all_tools = []
- all_resources = []
- all_knowledge = []
- for filepath in buffer_files:
- with open(filepath, "r", encoding="utf-8") as f:
- data = json.load(f)
- all_tools.extend(data.get("tools", []))
- all_resources.extend(data.get("resources", []))
- all_knowledge.extend(data.get("knowledge", []))
- # 去重(基于名称/标题)
- def dedupe_by_key(items: List[Dict], key: str) -> List[Dict]:
- seen = set()
- result = []
- for item in items:
- identifier = item.get(key)
- if identifier and identifier not in seen:
- seen.add(identifier)
- result.append(item)
- return result
- tools_deduped = dedupe_by_key(all_tools, "名称")
- resources_deduped = dedupe_by_key(all_resources, "标题")
- knowledge_deduped = dedupe_by_key(all_knowledge, "内容")
- # 保存整理后的数据
- organized_data = {
- "tools": tools_deduped,
- "resources": resources_deduped,
- "knowledge": knowledge_deduped,
- "organized_at": datetime.now().isoformat(),
- "source_files": [f.name for f in buffer_files]
- }
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- organized_file = ORGANIZED_DIR / f"organized_{timestamp}.json"
- with open(organized_file, "w", encoding="utf-8") as f:
- json.dump(organized_data, f, ensure_ascii=False, indent=2)
- # 清空 buffer(可选)
- if merge:
- for filepath in buffer_files:
- filepath.unlink()
- stats = [
- f"工具: {len(all_tools)} → {len(tools_deduped)} (去重 {len(all_tools) - len(tools_deduped)})",
- f"资源: {len(all_resources)} → {len(resources_deduped)} (去重 {len(all_resources) - len(resources_deduped)})",
- f"知识: {len(all_knowledge)} → {len(knowledge_deduped)} (去重 {len(all_knowledge) - len(knowledge_deduped)})",
- ]
- return ToolResult(
- title="✅ 整理完成",
- output=f"文件: {organized_file.name}\n\n" + "\n".join(f"- {s}" for s in stats),
- metadata={"filepath": str(organized_file), "stats": organized_data}
- )
- except Exception as e:
- logger.error(f"整理失败: {e}")
- return ToolResult(
- title="❌ 整理失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def commit_to_database(
- organized_file: Optional[str] = None,
- ) -> ToolResult:
- """
- 将整理后的数据提交到数据库,建立完整的关联关系
- Args:
- organized_file: 指定要提交的文件(不指定则提交最新的)
- Returns:
- 提交结果统计(按知识类型分组)
- """
- try:
- _ensure_dirs()
- # 找到要提交的文件
- if organized_file:
- filepath = ORGANIZED_DIR / organized_file
- else:
- organized_files = sorted(ORGANIZED_DIR.glob("organized_*.json"))
- if not organized_files:
- return ToolResult(
- title="ℹ️ 无整理数据",
- output="organized 目录为空,请先调用 organize_cached_data"
- )
- filepath = organized_files[-1]
- if not filepath.exists():
- return ToolResult(
- title="❌ 文件不存在",
- output=f"文件: {filepath.name}"
- )
- # 读取数据
- with open(filepath, "r", encoding="utf-8") as f:
- data = json.load(f)
- # 导入数据库工具
- from agent.tools.builtin.knowledge import resource_save, knowledge_save
- # 统计变量
- saved_tools = 0
- saved_resources = 0
- knowledge_by_type = {"tool": 0, "strategy": 0, "case": 0, "experience": 0}
- errors = []
- # 映射:resource_id -> knowledge_ids(用于反向关联)
- resource_to_knowledge = {}
- # 第一步:保存资源
- for resource in data.get("resources", []):
- try:
- resource_id = resource.get("id", f"resource_{saved_resources}")
- await resource_save(
- resource_id=resource_id,
- title=resource.get("标题", ""),
- body=resource.get("内容", ""),
- content_type=resource.get("类型", "text"),
- metadata=resource.get("元数据", {})
- )
- saved_resources += 1
- resource_to_knowledge[resource_id] = []
- except Exception as e:
- errors.append(f"资源 {resource.get('标题')}: {e}")
- # 第二步:保存知识,建立 resource_ids 关联
- for knowledge in data.get("knowledge", []):
- try:
- # 提取关联的 resource_ids
- resource_ids = knowledge.get("resource_ids", [])
- # 保存知识
- result = await knowledge_save(
- task=knowledge.get("主题", ""),
- content=knowledge.get("内容", ""),
- types=knowledge.get("类型", []),
- tags=knowledge.get("标签", {}),
- resource_ids=resource_ids,
- )
- # 统计知识类型
- types = knowledge.get("类型", [])
- for t in types:
- if t in knowledge_by_type:
- knowledge_by_type[t] += 1
- # 提取 knowledge_id,用于反向关联
- knowledge_id = result.metadata.get("knowledge_id")
- if knowledge_id:
- for rid in resource_ids:
- if rid in resource_to_knowledge:
- resource_to_knowledge[rid].append(knowledge_id)
- except Exception as e:
- errors.append(f"知识 {knowledge.get('主题')}: {e}")
- # 第三步:保存工具(作为 resource,类型为 tool)
- for tool in data.get("tools", []):
- try:
- tool_id = f"tools/{tool.get('分类', 'misc')}/{tool.get('名称', 'unknown')}"
- await resource_save(
- resource_id=tool_id,
- title=tool.get("名称", ""),
- body=json.dumps(tool, ensure_ascii=False, indent=2),
- content_type="tool",
- metadata={
- "category": tool.get("分类", ""),
- "introduction": tool.get("简介", ""),
- **tool.get("工具信息", {})
- }
- )
- saved_tools += 1
- except Exception as e:
- errors.append(f"工具 {tool.get('名称')}: {e}")
- # 构建统计输出
- stats_lines = [
- "**knowledge 表**:",
- f"- 工具知识: {knowledge_by_type['tool']} 条",
- f"- 工序知识: {knowledge_by_type['strategy']} 条",
- f"- 用例知识: {knowledge_by_type['case']} 条",
- f"- 执行经验: {knowledge_by_type['experience']} 条",
- "",
- "**resources 表**:",
- f"- 资源: {saved_resources} 个",
- f"- 工具: {saved_tools} 个",
- ]
- if errors:
- stats_lines.append(f"\n**错误**: {len(errors)} 个")
- output = "已提交到数据库\n\n" + "\n".join(stats_lines)
- if errors:
- output += "\n\n错误详情:\n" + "\n".join(f"- {e}" for e in errors[:5])
- return ToolResult(
- title="✅ 提交到数据库完成",
- output=output,
- metadata={
- "saved": {
- "tools": saved_tools,
- "resources": saved_resources,
- "knowledge": knowledge_by_type
- }
- }
- )
- except Exception as e:
- logger.error(f"提交到数据库失败: {e}")
- return ToolResult(
- title="❌ 提交失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(
- description=(
- "查看缓存状态,包括 buffer 和 organized 中的文件列表、统计信息。"
- "用于了解当前有多少数据在缓存中等待处理。"
- )
- )
- async def list_cache_status() -> ToolResult:
- """
- 查看缓存状态(buffer 和 organized 中的文件)
- Returns:
- 缓存目录中的文件列表和统计
- """
- _ensure_dirs()
- buffer_files = sorted(BUFFER_DIR.glob("*.json"))
- organized_files = sorted(ORGANIZED_DIR.glob("*.json"))
- lines = [f"**Buffer** ({len(buffer_files)} 个文件):"]
- total_tools = 0
- total_resources = 0
- total_knowledge = 0
- for f in buffer_files:
- size = f.stat().st_size
- # 读取文件统计
- try:
- with open(f, "r", encoding="utf-8") as file:
- data = json.load(file)
- tools = len(data.get("tools", []))
- resources = len(data.get("resources", []))
- knowledge = len(data.get("knowledge", []))
- total_tools += tools
- total_resources += resources
- total_knowledge += knowledge
- lines.append(
- f" - {f.name} ({size // 1024}KB): "
- f"工具 {tools}, 资源 {resources}, 知识 {knowledge}"
- )
- except Exception:
- lines.append(f" - {f.name} ({size // 1024}KB)")
- if buffer_files:
- lines.append(
- f"\n **合计**: 工具 {total_tools}, 资源 {total_resources}, 知识 {total_knowledge}"
- )
- lines.append(f"\n**Organized** ({len(organized_files)} 个文件):")
- for f in organized_files:
- size = f.stat().st_size
- lines.append(f" - {f.name} ({size // 1024}KB)")
- return ToolResult(
- title="📁 缓存状态",
- output="\n".join(lines),
- metadata={
- "files": [f.name for f in buffer_files],
- "total": {
- "tools": total_tools,
- "resources": total_resources,
- "knowledge": total_knowledge
- }
- }
- )
|