""" 原子知识保存工具 提供便捷的 API 让 Agent 快速保存结构化的原子知识 """ import os import re import json import yaml import logging from datetime import datetime from pathlib import Path from typing import List, Dict, Optional, Any from agent.tools import tool, ToolResult, ToolContext from ...llm.openrouter import openrouter_llm_call logger = logging.getLogger(__name__) def _generate_knowledge_id() -> str: """生成知识原子 ID""" return f"research-{datetime.now().strftime('%Y%m%d-%H%M%S')}" def _format_yaml_list(items: List[str], indent: int = 2) -> str: """格式化 YAML 列表""" if not items: return "[]" indent_str = " " * indent return "\n" + "\n".join(f"{indent_str}- {item}" for item in items) @tool() async def save_knowledge( scenario: str, content: str, tags_type: List[str], urls: List[str] = None, agent_id: str = "research_agent", score: int = 3, trace_id: str = "", ) -> ToolResult: """ 保存原子知识到本地文件(JSON 格式) Args: scenario: 任务描述(在什么情景下 + 要完成什么目标 + 得到能达成一个什么结果) content: 核心内容 tags_type: 知识类型标签,可选:tool, usercase, definition, plan urls: 参考来源链接列表(论文/GitHub/博客等) agent_id: 执行此调研的 agent ID score: 初始评分 1-5(默认 3) trace_id: 当前 trace ID(可选) Returns: 保存结果 """ try: # 生成 ID knowledge_id = _generate_knowledge_id() # 准备目录 knowledge_dir = Path(".cache/knowledge_atoms") knowledge_dir.mkdir(parents=True, exist_ok=True) # 构建文件路径(使用 .json 扩展名) file_path = knowledge_dir / f"{knowledge_id}.json" # 构建 JSON 数据结构 knowledge_data = { "id": knowledge_id, "trace_id": trace_id or "N/A", "tags": { "type": tags_type }, "scenario": scenario, "content": content, "trace": { "urls": urls or [], "agent_id": agent_id, "timestamp": datetime.now().isoformat() }, "eval": { "score": score, "helpful": 0, "harmful": 0, "helpful_history": [], "harmful_history": [] }, "metrics": { "helpful": 1, "harmful": 0 }, "created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 保存为 JSON 文件 with open(file_path, "w", encoding="utf-8") as f: json.dump(knowledge_data, f, ensure_ascii=False, indent=2) return ToolResult( title="✅ 原子知识已保存", output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n场景:\n{scenario[:100]}...", long_term_memory=f"保存原子知识: {knowledge_id} - {scenario[:50]}", metadata={"knowledge_id": knowledge_id, "file_path": str(file_path)} ) except Exception as e: return ToolResult( title="❌ 保存失败", output=f"错误: {str(e)}", error=str(e) ) @tool() async def update_knowledge( knowledge_id: str, add_helpful_case: Optional[Dict[str, str]] = None, add_harmful_case: Optional[Dict[str, str]] = None, update_score: Optional[int] = None, ) -> ToolResult: """ 更新已有的原子知识的评估反馈 Args: knowledge_id: 知识 ID(如 research-20260302-001) add_helpful_case: 添加好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."} add_harmful_case: 添加不好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."} update_score: 更新评分(1-5) Returns: 更新结果 """ try: # 查找文件 knowledge_dir = Path(".cache/knowledge_atoms") file_path = knowledge_dir / f"{knowledge_id}.md" if not file_path.exists(): return ToolResult( title="❌ 文件不存在", output=f"未找到知识文件: {file_path}", error="文件不存在" ) # 读取现有内容 with open(file_path, "r", encoding="utf-8") as f: content = f.read() # 更新内容 updated = False import re if add_helpful_case: # 增加 helpful 计数 helpful_match = re.search(r"helpful: (\d+)", content) current_helpful = int(helpful_match.group(1)) if helpful_match else 0 content = re.sub( r"helpful: \d+", f"helpful: {current_helpful + 1}", content ) # 添加案例到 helpful_history case_yaml = f""" - case_id: {add_helpful_case.get('case_id', 'unknown')} scenario: "{add_helpful_case.get('scenario', '')}" result: "{add_helpful_case.get('result', '')}" timestamp: {add_helpful_case.get('timestamp', datetime.now().isoformat())}""" if "helpful_history: []" in content: content = content.replace( "helpful_history: []", f"helpful_history:\n{case_yaml}" ) else: # 在 helpful_history 后追加 content = re.sub( r"(helpful_history:.*?)(\n harmful)", f"\\1\n{case_yaml}\\2", content, flags=re.DOTALL ) updated = True if add_harmful_case: # 增加 harmful 计数 harmful_match = re.search(r"harmful: (\d+)", content) current_harmful = int(harmful_match.group(1)) if harmful_match else 0 content = re.sub( r"harmful: \d+", f"harmful: {current_harmful + 1}", content ) # 添加案例到 harmful_history case_yaml = f""" - case_id: {add_harmful_case.get('case_id', 'unknown')} scenario: "{add_harmful_case.get('scenario', '')}" result: "{add_harmful_case.get('result', '')}" timestamp: {add_harmful_case.get('timestamp', datetime.now().isoformat())}""" if "harmful_history: []" in content: content = content.replace( "harmful_history: []", f"harmful_history:\n{case_yaml}" ) else: # 在 harmful_history 后追加 content = re.sub( r"(harmful_history:.*?)(\nmetrics)", f"\\1\n{case_yaml}\\2", content, flags=re.DOTALL ) updated = True if update_score is not None: content = re.sub( r"score: \d+", f"score: {update_score}", content ) updated = True if not updated: return ToolResult( title="⚠️ 无更新", output="未指定任何更新内容", long_term_memory="尝试更新原子知识但未指定更新内容" ) # 保存更新 with open(file_path, "w", encoding="utf-8") as f: f.write(content) summary = [] if add_helpful_case: summary.append(f"添加 helpful 案例: {add_helpful_case.get('case_id')}") if add_harmful_case: summary.append(f"添加 harmful 案例: {add_harmful_case.get('case_id')}") if update_score: summary.append(f"更新评分: {update_score}") return ToolResult( title="✅ 原子知识已更新", output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n更新内容:\n" + "\n".join(f"- {s}" for s in summary), long_term_memory=f"更新原子知识: {knowledge_id}" ) except Exception as e: return ToolResult( title="❌ 更新失败", output=f"错误: {str(e)}", error=str(e) ) @tool() async def list_knowledge( limit: int = 10, tags_type: Optional[List[str]] = None, ) -> ToolResult: """ 列出已保存的原子知识 Args: limit: 返回数量限制(默认 10) tags_type: 按类型过滤(可选) Returns: 知识列表 """ try: knowledge_dir = Path(".cache/knowledge_atoms") if not knowledge_dir.exists(): return ToolResult( title="📂 知识库为空", output="还没有保存任何原子知识", long_term_memory="知识库为空" ) # 获取所有文件 files = sorted(knowledge_dir.glob("*.md"), key=lambda x: x.stat().st_mtime, reverse=True) if not files: return ToolResult( title="📂 知识库为空", output="还没有保存任何原子知识", long_term_memory="知识库为空" ) # 读取并过滤 results = [] for file_path in files[:limit]: with open(file_path, "r", encoding="utf-8") as f: content = f.read() # 提取关键信息 import re id_match = re.search(r"id: (.+)", content) scenario_match = re.search(r"scenario: \|\n (.+)", content) score_match = re.search(r"score: (\d+)", content) knowledge_id = id_match.group(1) if id_match else "unknown" scenario = scenario_match.group(1) if scenario_match else "N/A" score = score_match.group(1) if score_match else "N/A" results.append(f"- [{knowledge_id}] (⭐{score}) {scenario[:60]}...") output = f"共找到 {len(files)} 条原子知识,显示最近 {len(results)} 条:\n\n" + "\n".join(results) return ToolResult( title="📚 原子知识列表", output=output, long_term_memory=f"列出 {len(results)} 条原子知识" ) except Exception as e: return ToolResult( title="❌ 列表失败", output=f"错误: {str(e)}", error=str(e) ) # ===== 语义检索功能 ===== async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k: int = 5) -> List[str]: """ 第一阶段:语义路由。 让 LLM 挑选出 2*k 个语义相关的 ID。 """ if not metadata_list: return [] # 扩大筛选范围到 2*k routing_k = k * 2 routing_data = [ { "id": m["id"], "tags": m["tags"], "scenario": m["scenario"][:100] # 只取前100字符 } for m in metadata_list ] prompt = f""" 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。 任务需求:"{query_text}" 可选知识列表: {json.dumps(routing_data, ensure_ascii=False, indent=1)} 请直接输出 ID 列表,用逗号分隔(例如: research-20260302-001, research-20260302-002)。若无相关项请输出 "None"。 """ try: print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.0-flash-001" ) content = response.get("content", "").strip() selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("research-")] print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}") return selected_ids except Exception as e: logger.error(f"LLM 知识路由失败: {e}") return [] async def _get_structured_knowledge(query_text: str, top_k: int = 5, min_score: int = 3) -> List[Dict]: """ 语义检索原子知识 1. 解析知识库文件(支持 JSON 和 YAML 格式) 2. 语义路由:提取 2*k 个 ID 3. 质量精排:基于评分筛选出最终的 k 个 """ knowledge_dir = Path(".cache/knowledge_atoms") if not knowledge_dir.exists(): print(f"[Knowledge System] 警告: 知识库目录不存在 ({knowledge_dir})") return [] # 同时支持 .json 和 .md 文件 json_files = list(knowledge_dir.glob("*.json")) md_files = list(knowledge_dir.glob("*.md")) files = json_files + md_files if not files: print(f"[Knowledge System] 警告: 知识库为空") return [] # --- 阶段 1: 解析所有知识文件 --- content_map = {} metadata_list = [] for file_path in files: try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() # 根据文件扩展名选择解析方式 if file_path.suffix == ".json": # 解析 JSON 格式 metadata = json.loads(content) else: # 解析 YAML frontmatter(兼容旧格式) yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL) if not yaml_match: logger.warning(f"跳过无效文件: {file_path}") continue metadata = yaml.safe_load(yaml_match.group(1)) if not isinstance(metadata, dict): logger.warning(f"跳过损坏的知识文件: {file_path}") continue kid = metadata.get("id") if not kid: logger.warning(f"跳过缺少 id 的知识文件: {file_path}") continue # 提取 scenario 和 content scenario = metadata.get("scenario", "").strip() content_text = metadata.get("content", "").strip() meta_item = { "id": kid, "tags": metadata.get("tags", {}), "scenario": scenario, "score": metadata.get("eval", {}).get("score", 3), "helpful": metadata.get("metrics", {}).get("helpful", 0), "harmful": metadata.get("metrics", {}).get("harmful", 0), } metadata_list.append(meta_item) content_map[kid] = { "scenario": scenario, "content": content_text, "score": meta_item["score"], "helpful": meta_item["helpful"], "harmful": meta_item["harmful"], } except Exception as e: logger.error(f"解析知识文件失败 {file_path}: {e}") continue if not metadata_list: print(f"[Knowledge System] 警告: 没有有效的知识条目") return [] # --- 阶段 2: 语义路由 (取 2*k) --- candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k) # --- 阶段 3: 质量精排 (根据评分和反馈选出最终的 k) --- print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...") scored_items = [] for kid in candidate_ids: if kid in content_map: item = content_map[kid] score = item["score"] helpful = item["helpful"] harmful = item["harmful"] # 计算综合分:基础分 + helpful - harmful*2 quality_score = score + helpful - (harmful * 2.0) # 过滤门槛:评分低于 min_score 或质量分过低 if score < min_score or quality_score < 0: print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})") continue scored_items.append({ "id": kid, "scenario": item["scenario"], "content": item["content"], "score": score, "quality_score": quality_score }) # 按照质量分排序 final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True) # 截取最终的 top_k result = final_sorted[:top_k] print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}") print(f"[Knowledge System] 检索结束。\n") return result @tool() async def search_knowledge( query: str, top_k: int = 5, min_score: int = 3, tags_type: Optional[List[str]] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 语义检索原子知识库 Args: query: 搜索查询(任务描述) top_k: 返回数量(默认 5) min_score: 最低评分过滤(默认 3) tags_type: 按类型过滤(tool/usercase/definition/plan) context: 工具上下文 Returns: 相关知识列表 """ try: relevant_items = await _get_structured_knowledge( query_text=query, top_k=top_k, min_score=min_score ) if not relevant_items: return ToolResult( title="🔍 未找到相关知识", output=f"查询: {query}\n\n知识库中暂无相关的高质量知识。建议进行调研。", long_term_memory=f"知识检索: 未找到相关知识 - {query[:50]}" ) # 格式化输出 output_lines = [f"查询: {query}\n", f"找到 {len(relevant_items)} 条相关知识:\n"] for idx, item in enumerate(relevant_items, 1): output_lines.append(f"\n### {idx}. [{item['id']}] (⭐ {item['score']})") output_lines.append(f"**场景**: {item['scenario'][:150]}...") output_lines.append(f"**内容**: {item['content'][:200]}...") return ToolResult( title="✅ 知识检索成功", output="\n".join(output_lines), long_term_memory=f"知识检索: 找到 {len(relevant_items)} 条相关知识 - {query[:50]}", metadata={ "count": len(relevant_items), "knowledge_ids": [item["id"] for item in relevant_items], "items": relevant_items } ) except Exception as e: logger.error(f"知识检索失败: {e}") return ToolResult( title="❌ 检索失败", output=f"错误: {str(e)}", error=str(e) )