| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183 |
- """
- 原子知识保存工具
- 提供便捷的 API 让 Agent 快速保存结构化的原子知识
- """
- import os
- import re
- import json
- import yaml
- import logging
- from datetime import datetime
- from pathlib import Path
- from typing import List, Dict, Optional, Any
- from agent.tools import tool, ToolResult, ToolContext
- from ...llm.openrouter import openrouter_llm_call
- logger = logging.getLogger(__name__)
- def _generate_knowledge_id() -> str:
- """生成知识原子 ID(带微秒和随机后缀避免冲突)"""
- import uuid
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- return f"knowledge-{timestamp}-{random_suffix}"
- def _format_yaml_list(items: List[str], indent: int = 2) -> str:
- """格式化 YAML 列表"""
- if not items:
- return "[]"
- indent_str = " " * indent
- return "\n" + "\n".join(f"{indent_str}- {item}" for item in items)
- @tool()
- async def save_knowledge(
- scenario: str,
- content: str,
- tags_type: List[str],
- urls: List[str] = None,
- agent_id: str = "research_agent",
- score: int = 3,
- trace_id: str = "",
- ) -> ToolResult:
- """
- 保存原子知识到本地文件(JSON 格式)
- Args:
- scenario: 任务描述(在什么情景下 + 要完成什么目标 + 得到能达成一个什么结果)
- content: 核心内容
- tags_type: 知识类型标签,可选:tool, usercase, definition, plan, strategy
- urls: 参考来源链接列表(论文/GitHub/博客等)
- agent_id: 执行此调研的 agent ID
- score: 初始评分 1-5(默认 3)
- trace_id: 当前 trace ID(可选)
- Returns:
- 保存结果
- """
- try:
- # 生成 ID
- knowledge_id = _generate_knowledge_id()
- # 准备目录
- knowledge_dir = Path(".cache/knowledge_atoms")
- knowledge_dir.mkdir(parents=True, exist_ok=True)
- # 构建文件路径(使用 .json 扩展名)
- file_path = knowledge_dir / f"{knowledge_id}.json"
- # 构建 JSON 数据结构
- knowledge_data = {
- "id": knowledge_id,
- "trace_id": trace_id or "N/A",
- "tags": {
- "type": tags_type
- },
- "scenario": scenario,
- "content": content,
- "trace": {
- "urls": urls or [],
- "agent_id": agent_id,
- "timestamp": datetime.now().isoformat()
- },
- "eval": {
- "score": score,
- "helpful": 0,
- "harmful": 0,
- "helpful_history": [],
- "harmful_history": []
- },
- "metrics": {
- "helpful": 1,
- "harmful": 0
- },
- "created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- # 保存为 JSON 文件
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(knowledge_data, f, ensure_ascii=False, indent=2)
- return ToolResult(
- title="✅ 原子知识已保存",
- output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n场景:\n{scenario[:100]}...",
- long_term_memory=f"保存原子知识: {knowledge_id} - {scenario[:50]}",
- metadata={"knowledge_id": knowledge_id, "file_path": str(file_path)}
- )
- except Exception as e:
- return ToolResult(
- title="❌ 保存失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def update_knowledge(
- knowledge_id: str,
- add_helpful_case: Optional[Dict[str, str]] = None,
- add_harmful_case: Optional[Dict[str, str]] = None,
- update_score: Optional[int] = None,
- evolve_feedback: Optional[str] = None,
- ) -> ToolResult:
- """
- 更新已有的原子知识的评估反馈
- Args:
- knowledge_id: 知识 ID(如 research-20260302-001)
- add_helpful_case: 添加好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."}
- add_harmful_case: 添加不好用的案例 {"case_id": "...", "scenario": "...", "result": "...", "timestamp": "..."}
- update_score: 更新评分(1-5)
- evolve_feedback: 经验进化反馈(当提供时,会使用 LLM 重写知识内容)
- Returns:
- 更新结果
- """
- try:
- # 查找文件(支持 JSON 和 MD 格式)
- knowledge_dir = Path(".cache/knowledge_atoms")
- json_path = knowledge_dir / f"{knowledge_id}.json"
- md_path = knowledge_dir / f"{knowledge_id}.md"
- file_path = None
- if json_path.exists():
- file_path = json_path
- is_json = True
- elif md_path.exists():
- file_path = md_path
- is_json = False
- else:
- return ToolResult(
- title="❌ 文件不存在",
- output=f"未找到知识文件: {knowledge_id}",
- error="文件不存在"
- )
- # 读取现有内容
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 解析数据
- if is_json:
- data = json.loads(content)
- else:
- # 解析 YAML frontmatter
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- return ToolResult(
- title="❌ 格式错误",
- output=f"无法解析知识文件格式: {file_path}",
- error="格式错误"
- )
- data = yaml.safe_load(yaml_match.group(1))
- # 更新内容
- updated = False
- summary = []
- if add_helpful_case:
- data["eval"]["helpful"] += 1
- data["eval"]["helpful_history"].append(add_helpful_case)
- data["metrics"]["helpful"] += 1
- summary.append(f"添加 helpful 案例: {add_helpful_case.get('case_id')}")
- updated = True
- if add_harmful_case:
- data["eval"]["harmful"] += 1
- data["eval"]["harmful_history"].append(add_harmful_case)
- data["metrics"]["harmful"] += 1
- summary.append(f"添加 harmful 案例: {add_harmful_case.get('case_id')}")
- updated = True
- if update_score is not None:
- data["eval"]["score"] = update_score
- summary.append(f"更新评分: {update_score}")
- updated = True
- # 经验进化机制
- if evolve_feedback:
- old_content = data.get("content", "")
- evolved_content = await _evolve_knowledge_with_llm(old_content, evolve_feedback)
- data["content"] = evolved_content
- data["metrics"]["helpful"] += 1
- summary.append(f"知识进化: 基于反馈重写内容")
- updated = True
- if not updated:
- return ToolResult(
- title="⚠️ 无更新",
- output="未指定任何更新内容",
- long_term_memory="尝试更新原子知识但未指定更新内容"
- )
- # 更新时间戳
- data["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 保存更新
- if is_json:
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- else:
- # 重新生成 YAML frontmatter
- meta_str = yaml.dump(data, allow_unicode=True).strip()
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(f"---\n{meta_str}\n---\n")
- return ToolResult(
- title="✅ 原子知识已更新",
- output=f"知识 ID: {knowledge_id}\n文件路径: {file_path}\n\n更新内容:\n" + "\n".join(f"- {s}" for s in summary),
- long_term_memory=f"更新原子知识: {knowledge_id}"
- )
- except Exception as e:
- return ToolResult(
- title="❌ 更新失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool()
- async def list_knowledge(
- limit: int = 10,
- tags_type: Optional[List[str]] = None,
- ) -> ToolResult:
- """
- 列出已保存的原子知识
- Args:
- limit: 返回数量限制(默认 10)
- tags_type: 按类型过滤(可选)
- Returns:
- 知识列表
- """
- try:
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- return ToolResult(
- title="📂 知识库为空",
- output="还没有保存任何原子知识",
- long_term_memory="知识库为空"
- )
- # 获取所有文件
- files = sorted(knowledge_dir.glob("*.md"), key=lambda x: x.stat().st_mtime, reverse=True)
- if not files:
- return ToolResult(
- title="📂 知识库为空",
- output="还没有保存任何原子知识",
- long_term_memory="知识库为空"
- )
- # 读取并过滤
- results = []
- for file_path in files[:limit]:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 提取关键信息
- import re
- id_match = re.search(r"id: (.+)", content)
- scenario_match = re.search(r"scenario: \|\n (.+)", content)
- score_match = re.search(r"score: (\d+)", content)
- knowledge_id = id_match.group(1) if id_match else "unknown"
- scenario = scenario_match.group(1) if scenario_match else "N/A"
- score = score_match.group(1) if score_match else "N/A"
- results.append(f"- [{knowledge_id}] (⭐{score}) {scenario[:60]}...")
- output = f"共找到 {len(files)} 条原子知识,显示最近 {len(results)} 条:\n\n" + "\n".join(results)
- return ToolResult(
- title="📚 原子知识列表",
- output=output,
- long_term_memory=f"列出 {len(results)} 条原子知识"
- )
- except Exception as e:
- return ToolResult(
- title="❌ 列表失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- # ===== 语义检索功能 =====
- async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k: int = 5) -> List[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- # 扩大筛选范围到 2*k
- routing_k = k * 2
- routing_data = [
- {
- "id": m["id"],
- "tags": m["tags"],
- "scenario": m["scenario"][:100] # 只取前100字符
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
- 任务需求:"{query_text}"
- 可选知识列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
- print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- logger.error(f"LLM 知识路由失败: {e}")
- return []
- async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
- """
- 使用 LLM 进行知识进化重写(类似经验进化机制)
- """
- prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。
- 【原知识内容】:
- {old_content}
- 【实战反馈建议】:
- {feedback}
- 【重写要求】:
- 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。
- 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。
- 3. 语言:简洁直接,使用中文。
- 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。
- """
- try:
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- evolved_content = response.get("content", "").strip()
- # 简单安全校验:如果 LLM 返回太短或为空,回退到原内容+追加
- if len(evolved_content) < 5:
- raise ValueError("LLM output too short")
- return evolved_content
- except Exception as e:
- logger.warning(f"知识进化失败,采用追加模式回退: {e}")
- timestamp = datetime.now().strftime('%Y-%m-%d')
- return f"{old_content}\n\n---\n[Update {timestamp}]: {feedback}"
- async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k: int = 5) -> List[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- # 扩大筛选范围到 2*k
- routing_k = k * 2
- routing_data = [
- {
- "id": m["id"],
- "tags": m["tags"],
- "scenario": m["scenario"][:100] # 只取前100字符
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
- 任务需求:"{query_text}"
- 可选知识列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
- print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- logger.error(f"LLM 知识路由失败: {e}")
- return []
- async def _get_structured_knowledge(
- query_text: str,
- top_k: int = 5,
- min_score: int = 3,
- context: Optional[Any] = None,
- tags_filter: Optional[List[str]] = None
- ) -> List[Dict]:
- """
- 语义检索原子知识(包括经验)
- 1. 解析知识库文件(支持 JSON 和 YAML 格式)
- 2. 语义路由:提取 2*k 个 ID
- 3. 质量精排:基于评分筛选出最终的 k 个
- Args:
- query_text: 查询文本
- top_k: 返回数量
- min_score: 最低评分过滤
- context: 上下文(兼容 experience 接口)
- tags_filter: 标签过滤(如 ["strategy"] 只返回经验)
- """
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- print(f"[Knowledge System] 警告: 知识库目录不存在 ({knowledge_dir})")
- return []
- # 同时支持 .json 和 .md 文件
- json_files = list(knowledge_dir.glob("*.json"))
- md_files = list(knowledge_dir.glob("*.md"))
- files = json_files + md_files
- if not files:
- print(f"[Knowledge System] 警告: 知识库为空")
- return []
- # --- 阶段 1: 解析所有知识文件 ---
- content_map = {}
- metadata_list = []
- for file_path in files:
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- # 根据文件扩展名选择解析方式
- if file_path.suffix == ".json":
- # 解析 JSON 格式
- metadata = json.loads(content)
- else:
- # 解析 YAML frontmatter(兼容旧格式)
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- logger.warning(f"跳过无效文件: {file_path}")
- continue
- metadata = yaml.safe_load(yaml_match.group(1))
- if not isinstance(metadata, dict):
- logger.warning(f"跳过损坏的知识文件: {file_path}")
- continue
- kid = metadata.get("id")
- if not kid:
- logger.warning(f"跳过缺少 id 的知识文件: {file_path}")
- continue
- # 提取 scenario 和 content
- scenario = metadata.get("scenario", "").strip()
- content_text = metadata.get("content", "").strip()
- # 标签过滤
- tags = metadata.get("tags", {})
- if tags_filter:
- # 检查 tags.type 是否包含任何过滤标签
- tag_types = tags.get("type", [])
- if isinstance(tag_types, str):
- tag_types = [tag_types]
- if not any(tag in tag_types for tag in tags_filter):
- continue # 跳过不匹配的标签
- meta_item = {
- "id": kid,
- "tags": tags,
- "scenario": scenario,
- "score": metadata.get("eval", {}).get("score", 3),
- "helpful": metadata.get("metrics", {}).get("helpful", 0),
- "harmful": metadata.get("metrics", {}).get("harmful", 0),
- }
- metadata_list.append(meta_item)
- content_map[kid] = {
- "scenario": scenario,
- "content": content_text,
- "tags": tags,
- "score": meta_item["score"],
- "helpful": meta_item["helpful"],
- "harmful": meta_item["harmful"],
- }
- except Exception as e:
- logger.error(f"解析知识文件失败 {file_path}: {e}")
- continue
- if not metadata_list:
- print(f"[Knowledge System] 警告: 没有有效的知识条目")
- return []
- # --- 阶段 2: 语义路由 (取 2*k) ---
- candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k)
- # --- 阶段 3: 质量精排 (根据评分和反馈选出最终的 k) ---
- print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...")
- scored_items = []
- for kid in candidate_ids:
- if kid in content_map:
- item = content_map[kid]
- score = item["score"]
- helpful = item["helpful"]
- harmful = item["harmful"]
- # 计算综合分:基础分 + helpful - harmful*2
- quality_score = score + helpful - (harmful * 2.0)
- # 过滤门槛:评分低于 min_score 或质量分过低
- if score < min_score or quality_score < 0:
- print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})")
- continue
- scored_items.append({
- "id": kid,
- "scenario": item["scenario"],
- "content": item["content"],
- "tags": item["tags"],
- "score": score,
- "quality_score": quality_score,
- "metrics": {
- "helpful": helpful,
- "harmful": harmful
- }
- })
- # 按照质量分排序
- final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True)
- # 截取最终的 top_k
- result = final_sorted[:top_k]
- print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}")
- print(f"[Knowledge System] 检索结束。\n")
- return result
- @tool()
- async def search_knowledge(
- query: str,
- top_k: int = 5,
- min_score: int = 3,
- tags_type: Optional[List[str]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 语义检索原子知识库
- Args:
- query: 搜索查询(任务描述)
- top_k: 返回数量(默认 5)
- min_score: 最低评分过滤(默认 3)
- tags_type: 按类型过滤(tool/usercase/definition/plan)
- context: 工具上下文
- Returns:
- 相关知识列表
- """
- try:
- relevant_items = await _get_structured_knowledge(
- query_text=query,
- top_k=top_k,
- min_score=min_score
- )
- if not relevant_items:
- return ToolResult(
- title="🔍 未找到相关知识",
- output=f"查询: {query}\n\n知识库中暂无相关的高质量知识。建议进行调研。",
- long_term_memory=f"知识检索: 未找到相关知识 - {query[:50]}"
- )
- # 格式化输出
- output_lines = [f"查询: {query}\n", f"找到 {len(relevant_items)} 条相关知识:\n"]
- for idx, item in enumerate(relevant_items, 1):
- output_lines.append(f"\n### {idx}. [{item['id']}] (⭐ {item['score']})")
- output_lines.append(f"**场景**: {item['scenario'][:150]}...")
- output_lines.append(f"**内容**: {item['content'][:200]}...")
- return ToolResult(
- title="✅ 知识检索成功",
- output="\n".join(output_lines),
- long_term_memory=f"知识检索: 找到 {len(relevant_items)} 条相关知识 - {query[:50]}",
- metadata={
- "count": len(relevant_items),
- "knowledge_ids": [item["id"] for item in relevant_items],
- "items": relevant_items
- }
- )
- except Exception as e:
- logger.error(f"知识检索失败: {e}")
- return ToolResult(
- title="❌ 检索失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(description="通过两阶段检索获取最相关的历史经验(strategy 标签的知识)")
- async def get_experience(
- query: str,
- k: int = 3,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 检索历史经验(兼容旧接口,实际调用 search_knowledge 并过滤 strategy 标签)
- Args:
- query: 搜索查询(任务描述)
- k: 返回数量(默认 3)
- context: 工具上下文
- Returns:
- 相关经验列表
- """
- try:
- relevant_items = await _get_structured_knowledge(
- query_text=query,
- top_k=k,
- min_score=1, # 经验的评分门槛较低
- context=context,
- tags_filter=["strategy"] # 只返回经验
- )
- if not relevant_items:
- return ToolResult(
- title="🔍 未找到相关经验",
- output=f"查询: {query}\n\n经验库中暂无相关的经验。",
- long_term_memory=f"经验检索: 未找到相关经验 - {query[:50]}",
- metadata={"items": [], "count": 0}
- )
- # 格式化输出(兼容旧格式)
- output_lines = [f"查询: {query}\n", f"找到 {len(relevant_items)} 条相关经验:\n"]
- for idx, item in enumerate(relevant_items, 1):
- output_lines.append(f"\n### {idx}. [{item['id']}]")
- output_lines.append(f"{item['content'][:300]}...")
- return ToolResult(
- title="✅ 经验检索成功",
- output="\n".join(output_lines),
- long_term_memory=f"经验检索: 找到 {len(relevant_items)} 条相关经验 - {query[:50]}",
- metadata={
- "items": relevant_items,
- "count": len(relevant_items)
- }
- )
- except Exception as e:
- logger.error(f"经验检索失败: {e}")
- return ToolResult(
- title="❌ 检索失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- # ===== 批量更新功能(类似经验机制)=====
- async def _batch_update_knowledge(
- update_map: Dict[str, Dict[str, Any]],
- context: Optional[Any] = None
- ) -> int:
- """
- 内部函数:批量更新知识(兼容 experience 接口)
- Args:
- update_map: 更新映射 {knowledge_id: {"action": "helpful/harmful/evolve", "feedback": "..."}}
- context: 上下文(兼容 experience 接口)
- Returns:
- 成功更新的数量
- """
- if not update_map:
- return 0
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- return 0
- success_count = 0
- evolution_tasks = []
- evolution_registry = {} # task_idx -> (file_path, data)
- for knowledge_id, instr in update_map.items():
- try:
- # 查找文件
- json_path = knowledge_dir / f"{knowledge_id}.json"
- md_path = knowledge_dir / f"{knowledge_id}.md"
- file_path = None
- is_json = False
- if json_path.exists():
- file_path = json_path
- is_json = True
- elif md_path.exists():
- file_path = md_path
- is_json = False
- else:
- continue
- # 读取并解析
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- if is_json:
- data = json.loads(content)
- else:
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- continue
- data = yaml.safe_load(yaml_match.group(1))
- # 更新 metrics
- action = instr.get("action")
- feedback = instr.get("feedback", "")
- # 处理 mixed 中间态
- if action == "mixed":
- data["metrics"]["helpful"] = data.get("metrics", {}).get("helpful", 0) + 1
- action = "evolve"
- if action == "helpful":
- data["metrics"]["helpful"] = data.get("metrics", {}).get("helpful", 0) + 1
- elif action == "harmful":
- data["metrics"]["harmful"] = data.get("metrics", {}).get("harmful", 0) + 1
- elif action == "evolve" and feedback:
- # 注册进化任务
- old_content = data.get("content", "")
- task = _evolve_knowledge_with_llm(old_content, feedback)
- evolution_tasks.append(task)
- evolution_registry[len(evolution_tasks) - 1] = (file_path, data, is_json)
- data["metrics"]["helpful"] = data.get("metrics", {}).get("helpful", 0) + 1
- data["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 如果不需要进化,直接保存
- if action != "evolve" or not feedback:
- if is_json:
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- else:
- meta_str = yaml.dump(data, allow_unicode=True).strip()
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(f"---\n{meta_str}\n---\n")
- success_count += 1
- except Exception as e:
- logger.error(f"更新知识失败 {knowledge_id}: {e}")
- continue
- # 并发进化
- if evolution_tasks:
- import asyncio
- print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
- evolved_results = await asyncio.gather(*evolution_tasks)
- # 回填进化结果
- for task_idx, (file_path, data, is_json) in evolution_registry.items():
- data["content"] = evolved_results[task_idx].strip()
- if is_json:
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- else:
- meta_str = yaml.dump(data, allow_unicode=True).strip()
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(f"---\n{meta_str}\n---\n")
- success_count += 1
- return success_count
- @tool()
- async def batch_update_knowledge(
- feedback_list: List[Dict[str, Any]],
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 批量反馈知识的有效性(类似经验机制)
- Args:
- feedback_list: 评价列表,每个元素包含:
- - knowledge_id: (str) 知识 ID
- - is_effective: (bool) 是否有效
- - feedback: (str, optional) 改进建议,若有效且有建议则触发知识进化
- Returns:
- 批量更新结果
- """
- try:
- if not feedback_list:
- return ToolResult(
- title="⚠️ 反馈列表为空",
- output="未提供任何反馈",
- long_term_memory="批量更新知识: 反馈列表为空"
- )
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- return ToolResult(
- title="❌ 知识库不存在",
- output="知识库目录不存在",
- error="知识库不存在"
- )
- success_count = 0
- failed_items = []
- for item in feedback_list:
- knowledge_id = item.get("knowledge_id")
- is_effective = item.get("is_effective")
- feedback = item.get("feedback", "")
- if not knowledge_id:
- failed_items.append({"id": "unknown", "reason": "缺少 knowledge_id"})
- continue
- try:
- # 查找文件
- json_path = knowledge_dir / f"{knowledge_id}.json"
- md_path = knowledge_dir / f"{knowledge_id}.md"
- file_path = None
- is_json = False
- if json_path.exists():
- file_path = json_path
- is_json = True
- elif md_path.exists():
- file_path = md_path
- is_json = False
- else:
- failed_items.append({"id": knowledge_id, "reason": "文件不存在"})
- continue
- # 读取并解析
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- if is_json:
- data = json.loads(content)
- else:
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- failed_items.append({"id": knowledge_id, "reason": "格式错误"})
- continue
- data = yaml.safe_load(yaml_match.group(1))
- # 更新 metrics
- if is_effective:
- data["metrics"]["helpful"] = data.get("metrics", {}).get("helpful", 0) + 1
- # 如果有反馈建议,触发进化
- if feedback:
- old_content = data.get("content", "")
- evolved_content = await _evolve_knowledge_with_llm(old_content, feedback)
- data["content"] = evolved_content
- else:
- data["metrics"]["harmful"] = data.get("metrics", {}).get("harmful", 0) + 1
- data["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 保存
- if is_json:
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- else:
- meta_str = yaml.dump(data, allow_unicode=True).strip()
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(f"---\n{meta_str}\n---\n")
- success_count += 1
- except Exception as e:
- failed_items.append({"id": knowledge_id, "reason": str(e)})
- continue
- output_lines = [f"成功更新 {success_count} 条知识"]
- if failed_items:
- output_lines.append(f"\n失败 {len(failed_items)} 条:")
- for item in failed_items:
- output_lines.append(f" - {item['id']}: {item['reason']}")
- return ToolResult(
- title="✅ 批量更新完成",
- output="\n".join(output_lines),
- long_term_memory=f"批量更新知识: 成功 {success_count} 条,失败 {len(failed_items)} 条"
- )
- except Exception as e:
- logger.error(f"批量更新知识失败: {e}")
- return ToolResult(
- title="❌ 批量更新失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- # ===== 知识库瘦身功能(类似经验机制)=====
- @tool()
- async def slim_knowledge(
- model: str = "anthropic/claude-sonnet-4.5",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 知识库瘦身:调用顶级大模型,将知识库中语义相似的知识合并精简
- Args:
- model: 使用的模型(默认 claude-sonnet-4.5)
- context: 工具上下文
- Returns:
- 瘦身结果报告
- """
- try:
- knowledge_dir = Path(".cache/knowledge_atoms")
- if not knowledge_dir.exists():
- return ToolResult(
- title="📂 知识库不存在",
- output="知识库目录不存在,无需瘦身",
- long_term_memory="知识库瘦身: 目录不存在"
- )
- # 获取所有文件
- json_files = list(knowledge_dir.glob("*.json"))
- md_files = list(knowledge_dir.glob("*.md"))
- files = json_files + md_files
- if len(files) < 2:
- return ToolResult(
- title="📂 知识库过小",
- output=f"知识库仅有 {len(files)} 条,无需瘦身",
- long_term_memory=f"知识库瘦身: 仅有 {len(files)} 条"
- )
- # 解析所有知识
- parsed = []
- for file_path in files:
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- if file_path.suffix == ".json":
- data = json.loads(content)
- else:
- yaml_match = re.search(r'^---\n(.*?)\n---', content, re.DOTALL)
- if not yaml_match:
- continue
- data = yaml.safe_load(yaml_match.group(1))
- parsed.append({
- "file_path": file_path,
- "data": data,
- "is_json": file_path.suffix == ".json"
- })
- except Exception as e:
- logger.error(f"解析文件失败 {file_path}: {e}")
- continue
- if len(parsed) < 2:
- return ToolResult(
- title="📂 有效知识过少",
- output=f"有效知识仅有 {len(parsed)} 条,无需瘦身",
- long_term_memory=f"知识库瘦身: 有效知识 {len(parsed)} 条"
- )
- # 构造发给大模型的内容
- entries_text = ""
- for p in parsed:
- data = p["data"]
- entries_text += f"[ID: {data.get('id')}] [Tags: {data.get('tags', {})}] "
- entries_text += f"[Metrics: {data.get('metrics', {})}] [Score: {data.get('eval', {}).get('score', 3)}]\n"
- entries_text += f"Scenario: {data.get('scenario', 'N/A')}\n"
- entries_text += f"Content: {data.get('content', '')[:200]}...\n\n"
- prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作:
- 【任务】:
- 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。
- 2. 合并时保留 helpful 最高的那条的 ID 和 metrics(metrics 中 helpful/harmful 取各条之和)。
- 3. 对于独立的、无重复的知识,保持原样不动。
- 4. 保持原有的知识结构和格式。
- 【当前知识库】:
- {entries_text}
- 【输出格式要求】:
- 严格按以下格式输出每条知识,条目之间用 === 分隔:
- ID: <保留的id>
- TAGS: <yaml格式的tags>
- METRICS: <yaml格式的metrics>
- SCORE: <评分>
- SCENARIO: <场景描述>
- CONTENT: <合并后的知识内容>
- ===
- 最后一行输出合并报告,格式:
- REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
- 禁止输出任何开场白或解释。"""
- print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(parsed)} 条知识...")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model=model
- )
- content = response.get("content", "").strip()
- if not content:
- return ToolResult(
- title="❌ 大模型返回为空",
- output="大模型返回为空,瘦身失败",
- error="大模型返回为空"
- )
- # 解析大模型输出
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- kid, tags, metrics, score, scenario, content_lines = None, {}, {}, 3, "", []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- kid = line[3:].strip()
- current_field = None
- elif line.startswith("TAGS:"):
- try:
- tags = yaml.safe_load(line[5:].strip()) or {}
- except Exception:
- tags = {}
- current_field = None
- elif line.startswith("METRICS:"):
- try:
- metrics = yaml.safe_load(line[8:].strip()) or {}
- except Exception:
- metrics = {"helpful": 0, "harmful": 0}
- current_field = None
- elif line.startswith("SCORE:"):
- try:
- score = int(line[6:].strip())
- except Exception:
- score = 3
- current_field = None
- elif line.startswith("SCENARIO:"):
- scenario = line[9:].strip()
- current_field = "scenario"
- elif line.startswith("CONTENT:"):
- content_lines.append(line[8:].strip())
- current_field = "content"
- elif current_field == "scenario":
- scenario += "\n" + line
- elif current_field == "content":
- content_lines.append(line)
- if kid and content_lines:
- new_data = {
- "id": kid,
- "tags": tags,
- "scenario": scenario,
- "content": "\n".join(content_lines).strip(),
- "metrics": metrics,
- "eval": {
- "score": score,
- "helpful": 0,
- "harmful": 0,
- "helpful_history": [],
- "harmful_history": []
- },
- "updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- }
- new_entries.append(new_data)
- if not new_entries:
- return ToolResult(
- title="❌ 解析失败",
- output="解析大模型输出失败,知识库未修改",
- error="解析失败"
- )
- # 删除旧文件
- for p in parsed:
- try:
- p["file_path"].unlink()
- except Exception as e:
- logger.error(f"删除旧文件失败 {p['file_path']}: {e}")
- # 写入新文件(统一使用 JSON 格式)
- for data in new_entries:
- file_path = knowledge_dir / f"{data['id']}.json"
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- result = f"瘦身完成:{len(parsed)} → {len(new_entries)} 条知识"
- if report_line:
- result += f"\n{report_line}"
- print(f"[知识瘦身] {result}")
- return ToolResult(
- title="✅ 知识库瘦身完成",
- output=result,
- long_term_memory=f"知识库瘦身: {len(parsed)} → {len(new_entries)} 条"
- )
- except Exception as e:
- logger.error(f"知识库瘦身失败: {e}")
- return ToolResult(
- title="❌ 瘦身失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
|