| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559 |
- """
- 原子知识保存工具
- 提供便捷的 API 让 Agent 快速保存结构化的原子知识
- """
- import os
- import re
- import json
- import yaml
- import logging
- from datetime import datetime
- from pathlib import Path
- from typing import List, Dict, Optional, Any
- from agent.tools import tool, ToolResult, ToolContext
- from ...llm.openrouter import openrouter_llm_call
- logger = logging.getLogger(__name__)
- def _generate_knowledge_id() -> str:
- """生成知识原子 ID"""
- return f"research-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
- def _format_yaml_list(items: List[str], indent: int = 2) -> str:
- """格式化 YAML 列表"""
- if not items:
- return "[]"
- indent_str = " " * indent
- return "\n" + "\n".join(f"{indent_str}- {item}" for item in items)
- @tool()
- async def save_knowledge(
- scenario: str,
- content: str,
- tags_type: List[str],
- urls: List[str] = None,
- agent_id: str = "research_agent",
- score: int = 3,
- trace_id: str = "",
- ) -> ToolResult:
- """
- 保存原子知识到本地文件(JSON 格式)
- Args:
- scenario: 任务描述(在什么情景下 + 要完成什么目标 + 得到能达成一个什么结果)
- content: 核心内容
- tags_type: 知识类型标签,可选:tool, usercase, definition, plan
- urls: 参考来源链接列表(论文/GitHub/博客等)
- agent_id: 执行此调研的 agent ID
- score: 初始评分 1-5(默认 3)
- trace_id: 当前 trace ID(可选)
- Returns:
- 保存结果
- """
- try:
- # 生成 ID
- knowledge_id = _generate_knowledge_id()
- # 准备目录
- knowledge_dir = Path(".cache/knowledge_atoms")
- knowledge_dir.mkdir(parents=True, exist_ok=True)
- # 构建文件路径(使用 .json 扩展名)
- file_path = knowledge_dir / f"{knowledge_id}.json"
- # 构建 JSON 数据结构
- knowledge_data = {
- "id": knowledge_id,
- "trace_id": trace_id or "N/A",
- "tags": {
- "type": tags_type
- },
- "scenario": scenario,
- "content": content,
- "trace": {
- "urls": urls or [],
- "agent_id": agent_id,
- "timestamp": datetime.now().isoformat()
- },
- "eval": {
- "score": score,
- "helpful": 0,
- "harmful": 0,
- "helpful_history": [],
- "harmful_history": []
- },
- "metrics": {
- "helpful": 1,
- "harmful": 0
- },
- "created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- # 保存为 JSON 文件
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(knowledge_data, f, ensure_ascii=False, indent=2)
- return ToolResult(
- title="✅ 原子知识已保存",
- output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n场景:\n{scenario[:100]}...",
- long_term_memory=f"保存原子知识: {knowledge_id} - {scenario[:50]}",
- metadata={"knowledge_id": knowledge_id, "file_path": str(file_path)}
- )
- except Exception as e:
- return ToolResult(
- title="❌ 保存失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def update_knowledge(
- knowledge_id: str,
- add_helpful_case: Optional[Dict[str, str]] = None,
- add_harmful_case: Optional[Dict[str, str]] = None,
- update_score: Optional[int] = None,
- ) -> ToolResult:
- """
- 更新已有的原子知识的评估反馈
- Args:
- knowledge_id: 知识 ID(如 research-20260302-001)
- add_helpful_case: 添加好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."}
- add_harmful_case: 添加不好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."}
- update_score: 更新评分(1-5)
- Returns:
- 更新结果
- """
- try:
- # 查找文件
- knowledge_dir = Path(".cache/knowledge_atoms")
- file_path = knowledge_dir / f"{knowledge_id}.md"
- if not file_path.exists():
- return ToolResult(
- title="❌ 文件不存在",
- output=f"未找到知识文件: {file_path}",
- error="文件不存在"
- )
- # 读取现有内容
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 更新内容
- updated = False
- import re
- if add_helpful_case:
- # 增加 helpful 计数
- helpful_match = re.search(r"helpful: (\d+)", content)
- current_helpful = int(helpful_match.group(1)) if helpful_match else 0
- content = re.sub(
- r"helpful: \d+",
- f"helpful: {current_helpful + 1}",
- content
- )
- # 添加案例到 helpful_history
- case_yaml = f""" - case_id: {add_helpful_case.get('case_id', 'unknown')}
- scenario: "{add_helpful_case.get('scenario', '')}"
- result: "{add_helpful_case.get('result', '')}"
- timestamp: {add_helpful_case.get('timestamp', datetime.now().isoformat())}"""
- if "helpful_history: []" in content:
- content = content.replace(
- "helpful_history: []",
- f"helpful_history:\n{case_yaml}"
- )
- else:
- # 在 helpful_history 后追加
- content = re.sub(
- r"(helpful_history:.*?)(\n harmful)",
- f"\\1\n{case_yaml}\\2",
- content,
- flags=re.DOTALL
- )
- updated = True
- if add_harmful_case:
- # 增加 harmful 计数
- harmful_match = re.search(r"harmful: (\d+)", content)
- current_harmful = int(harmful_match.group(1)) if harmful_match else 0
- content = re.sub(
- r"harmful: \d+",
- f"harmful: {current_harmful + 1}",
- content
- )
- # 添加案例到 harmful_history
- case_yaml = f""" - case_id: {add_harmful_case.get('case_id', 'unknown')}
- scenario: "{add_harmful_case.get('scenario', '')}"
- result: "{add_harmful_case.get('result', '')}"
- timestamp: {add_harmful_case.get('timestamp', datetime.now().isoformat())}"""
- if "harmful_history: []" in content:
- content = content.replace(
- "harmful_history: []",
- f"harmful_history:\n{case_yaml}"
- )
- else:
- # 在 harmful_history 后追加
- content = re.sub(
- r"(harmful_history:.*?)(\nmetrics)",
- f"\\1\n{case_yaml}\\2",
- content,
- flags=re.DOTALL
- )
- updated = True
- if update_score is not None:
- content = re.sub(
- r"score: \d+",
- f"score: {update_score}",
- content
- )
- updated = True
- if not updated:
- return ToolResult(
- title="⚠️ 无更新",
- output="未指定任何更新内容",
- long_term_memory="尝试更新原子知识但未指定更新内容"
- )
- # 保存更新
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(content)
- summary = []
- if add_helpful_case:
- summary.append(f"添加 helpful 案例: {add_helpful_case.get('case_id')}")
- if add_harmful_case:
- summary.append(f"添加 harmful 案例: {add_harmful_case.get('case_id')}")
- if update_score:
- summary.append(f"更新评分: {update_score}")
- return ToolResult(
- title="✅ 原子知识已更新",
- output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n更新内容:\n" + "\n".join(f"- {s}" for s in summary),
- long_term_memory=f"更新原子知识: {knowledge_id}"
- )
- except Exception as e:
- return ToolResult(
- title="❌ 更新失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def list_knowledge(
- limit: int = 10,
- tags_type: Optional[List[str]] = None,
- ) -> ToolResult:
- """
- 列出已保存的原子知识
- Args:
- limit: 返回数量限制(默认 10)
- tags_type: 按类型过滤(可选)
- Returns:
- 知识列表
- """
- try:
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- return ToolResult(
- title="📂 知识库为空",
- output="还没有保存任何原子知识",
- long_term_memory="知识库为空"
- )
- # 获取所有文件
- files = sorted(knowledge_dir.glob("*.md"), key=lambda x: x.stat().st_mtime, reverse=True)
- if not files:
- return ToolResult(
- title="📂 知识库为空",
- output="还没有保存任何原子知识",
- long_term_memory="知识库为空"
- )
- # 读取并过滤
- results = []
- for file_path in files[:limit]:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 提取关键信息
- import re
- id_match = re.search(r"id: (.+)", content)
- scenario_match = re.search(r"scenario: \|\n (.+)", content)
- score_match = re.search(r"score: (\d+)", content)
- knowledge_id = id_match.group(1) if id_match else "unknown"
- scenario = scenario_match.group(1) if scenario_match else "N/A"
- score = score_match.group(1) if score_match else "N/A"
- results.append(f"- [{knowledge_id}] (⭐{score}) {scenario[:60]}...")
- output = f"共找到 {len(files)} 条原子知识,显示最近 {len(results)} 条:\n\n" + "\n".join(results)
- return ToolResult(
- title="📚 原子知识列表",
- output=output,
- long_term_memory=f"列出 {len(results)} 条原子知识"
- )
- except Exception as e:
- return ToolResult(
- title="❌ 列表失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- # ===== 语义检索功能 =====
- async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k: int = 5) -> List[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- # 扩大筛选范围到 2*k
- routing_k = k * 2
- routing_data = [
- {
- "id": m["id"],
- "tags": m["tags"],
- "scenario": m["scenario"][:100] # 只取前100字符
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
- 任务需求:"{query_text}"
- 可选知识列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: research-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("research-")]
- print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- logger.error(f"LLM 知识路由失败: {e}")
- return []
- async def _get_structured_knowledge(query_text: str, top_k: int = 5, min_score: int = 3) -> List[Dict]:
- """
- 语义检索原子知识
- 1. 解析知识库文件(支持 JSON 和 YAML 格式)
- 2. 语义路由:提取 2*k 个 ID
- 3. 质量精排:基于评分筛选出最终的 k 个
- """
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- print(f"[Knowledge System] 警告: 知识库目录不存在 ({knowledge_dir})")
- return []
- # 同时支持 .json 和 .md 文件
- json_files = list(knowledge_dir.glob("*.json"))
- md_files = list(knowledge_dir.glob("*.md"))
- files = json_files + md_files
- if not files:
- print(f"[Knowledge System] 警告: 知识库为空")
- return []
- # --- 阶段 1: 解析所有知识文件 ---
- content_map = {}
- metadata_list = []
- for file_path in files:
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 根据文件扩展名选择解析方式
- if file_path.suffix == ".json":
- # 解析 JSON 格式
- metadata = json.loads(content)
- else:
- # 解析 YAML frontmatter(兼容旧格式)
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- logger.warning(f"跳过无效文件: {file_path}")
- continue
- metadata = yaml.safe_load(yaml_match.group(1))
- if not isinstance(metadata, dict):
- logger.warning(f"跳过损坏的知识文件: {file_path}")
- continue
- kid = metadata.get("id")
- if not kid:
- logger.warning(f"跳过缺少 id 的知识文件: {file_path}")
- continue
- # 提取 scenario 和 content
- scenario = metadata.get("scenario", "").strip()
- content_text = metadata.get("content", "").strip()
- meta_item = {
- "id": kid,
- "tags": metadata.get("tags", {}),
- "scenario": scenario,
- "score": metadata.get("eval", {}).get("score", 3),
- "helpful": metadata.get("metrics", {}).get("helpful", 0),
- "harmful": metadata.get("metrics", {}).get("harmful", 0),
- }
- metadata_list.append(meta_item)
- content_map[kid] = {
- "scenario": scenario,
- "content": content_text,
- "score": meta_item["score"],
- "helpful": meta_item["helpful"],
- "harmful": meta_item["harmful"],
- }
- except Exception as e:
- logger.error(f"解析知识文件失败 {file_path}: {e}")
- continue
- if not metadata_list:
- print(f"[Knowledge System] 警告: 没有有效的知识条目")
- return []
- # --- 阶段 2: 语义路由 (取 2*k) ---
- candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k)
- # --- 阶段 3: 质量精排 (根据评分和反馈选出最终的 k) ---
- print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...")
- scored_items = []
- for kid in candidate_ids:
- if kid in content_map:
- item = content_map[kid]
- score = item["score"]
- helpful = item["helpful"]
- harmful = item["harmful"]
- # 计算综合分:基础分 + helpful - harmful*2
- quality_score = score + helpful - (harmful * 2.0)
- # 过滤门槛:评分低于 min_score 或质量分过低
- if score < min_score or quality_score < 0:
- print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})")
- continue
- scored_items.append({
- "id": kid,
- "scenario": item["scenario"],
- "content": item["content"],
- "score": score,
- "quality_score": quality_score
- })
- # 按照质量分排序
- final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True)
- # 截取最终的 top_k
- result = final_sorted[:top_k]
- print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}")
- print(f"[Knowledge System] 检索结束。\n")
- return result
- @tool()
- async def search_knowledge(
- query: str,
- top_k: int = 5,
- min_score: int = 3,
- tags_type: Optional[List[str]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 语义检索原子知识库
- Args:
- query: 搜索查询(任务描述)
- top_k: 返回数量(默认 5)
- min_score: 最低评分过滤(默认 3)
- tags_type: 按类型过滤(tool/usercase/definition/plan)
- context: 工具上下文
- Returns:
- 相关知识列表
- """
- try:
- relevant_items = await _get_structured_knowledge(
- query_text=query,
- top_k=top_k,
- min_score=min_score
- )
- if not relevant_items:
- return ToolResult(
- title="🔍 未找到相关知识",
- output=f"查询: {query}\n\n知识库中暂无相关的高质量知识。建议进行调研。",
- long_term_memory=f"知识检索: 未找到相关知识 - {query[:50]}"
- )
- # 格式化输出
- output_lines = [f"查询: {query}\n", f"找到 {len(relevant_items)} 条相关知识:\n"]
- for idx, item in enumerate(relevant_items, 1):
- output_lines.append(f"\n### {idx}. [{item['id']}] (⭐ {item['score']})")
- output_lines.append(f"**场景**: {item['scenario'][:150]}...")
- output_lines.append(f"**内容**: {item['content'][:200]}...")
- return ToolResult(
- title="✅ 知识检索成功",
- output="\n".join(output_lines),
- long_term_memory=f"知识检索: 找到 {len(relevant_items)} 条相关知识 - {query[:50]}",
- metadata={
- "count": len(relevant_items),
- "knowledge_ids": [item["id"] for item in relevant_items],
- "items": relevant_items
- }
- )
- except Exception as e:
- logger.error(f"知识检索失败: {e}")
- return ToolResult(
- title="❌ 检索失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
|