import logging import os import yaml import json import asyncio import re from typing import List, Optional, Dict, Any from datetime import datetime from ...llm.openrouter import openrouter_llm_call logger = logging.getLogger(__name__) # 默认经验存储路径(当无法从 context 获取时使用) DEFAULT_EXPERIENCES_PATH = "./.cache/experiences_0.md" def _get_experiences_path(context: Optional[Any] = None) -> str: """ 从 context 中获取 experiences_path,回退到默认路径。 context 可能包含 runner 引用,从中读取配置的路径。 """ if context and isinstance(context, dict): runner = context.get("runner") if runner and hasattr(runner, "experiences_path"): path = runner.experiences_path or DEFAULT_EXPERIENCES_PATH print(f"[Experience] 使用 runner 配置的路径: {runner.experiences_path}") return path print(f"[Experience] 使用默认路径: {DEFAULT_EXPERIENCES_PATH}") return DEFAULT_EXPERIENCES_PATH # ===== 经验进化重写 ===== async def _evolve_body_with_llm(old_body: str, feedback: str) -> str: """ 使用检索级别的小模型 (Flash Lite) 执行经验进化重写。 """ prompt = f"""你是一个 AI Agent 经验库管理员。请根据反馈建议,对现有的 ACE 规范经验进行重写进化。 【原经验内容】: {old_body} 【实战反馈建议】: {feedback} 【重写要求】: 1. 保持 ACE 规范:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。 2. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原经验,使其更具通用性和准确性。 3. 语言:简洁直接,使用中文。 4. 禁止:严禁输出任何开场白、解释语或 Markdown 标题,直接返回重写后的正文。 """ try: # 调用与检索路由相同的廉价模型 response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.0-flash-001" ) evolved_content = response.get("content", "").strip() # 简单安全校验:如果 LLM 返回太短或为空,回退到原内容+追加 if len(evolved_content) < 5: raise ValueError("LLM output too short") return evolved_content except Exception as e: logger.warning(f"小模型进化失败,采用追加模式回退: {e}") timestamp = datetime.now().strftime('%Y-%m-%d') return f"{old_body}\n- [Update {timestamp}]: {feedback}" # ===== 核心挑选逻辑 ===== async def _route_experiences_by_llm(query_text: str, metadata_list: List[Dict], k: int = 3) -> List[str]: """ 第一阶段:语义路由。 让 LLM 挑选出 2*k 个语义相关的 ID。 """ if not metadata_list: return [] # 扩大筛选范围到 2*k routing_k = k * 2 routing_data = [ { "id": m["id"], "tags": m["tags"], "helpful": m["metrics"]["helpful"] } for m in metadata_list ] prompt = f""" 你是一个经验检索专家。根据用户的当前意图,从下列经验元数据中挑选出最相关的最多 {routing_k} 个经验 ID。 意图:"{query_text}" 可选经验列表: {json.dumps(routing_data, ensure_ascii=False, indent=1)} 请直接输出 ID 列表,用逗号分隔(例如: ex_01, ex_02)。若无相关项请输出 "None"。 """ try: print(f"\n[Step 1: 语义路由] 意图: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.0-flash-001" ) content = response.get("content", "").strip() selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("ex_")] print(f"[Step 1: 语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}") return selected_ids except Exception as e: logger.error(f"LLM 经验路由失败: {e}") return [] async def _get_structured_experiences(query_text: str, top_k: int = 3, context: Optional[Any] = None): """ 1. 解析物理文件 2. 语义路由:提取 2*k 个 ID 3. 质量精排:基于 Metrics 筛选出最终的 k 个 """ print(f"[Experience System] runner.experiences_path: {context.get('runner').experiences_path if context and context.get('runner') else None}") experiences_path = _get_experiences_path(context) if not os.path.exists(experiences_path): print(f"[Experience System] 警告: 经验文件不存在 ({experiences_path})") return [] with open(experiences_path, "r", encoding="utf-8") as f: file_content = f.read() # --- 阶段 1: 解析 --- # 使用正则表达式匹配 YAML frontmatter 块,避免误分割 pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)' matches = re.findall(pattern, file_content, re.DOTALL) content_map = {} metadata_list = [] for yaml_str, raw_body in matches: try: metadata = yaml.safe_load(yaml_str) # 检查 metadata 类型 if not isinstance(metadata, dict): logger.error(f"跳过损坏的经验块: metadata 不是 dict,而是 {type(metadata).__name__}") continue eid = metadata.get("id") if not eid: logger.error("跳过损坏的经验块: 缺少 id 字段") continue meta_item = { "id": eid, "tags": metadata.get("tags", {}), "metrics": metadata.get("metrics", {"helpful": 0, "harmful": 0}), } metadata_list.append(meta_item) content_map[eid] = { "content": raw_body.strip(), "metrics": meta_item["metrics"] } except Exception as e: logger.error(f"跳过损坏的经验块: {e}") continue # --- 阶段 2: 语义路由 (取 2*k) --- candidate_ids = await _route_experiences_by_llm(query_text, metadata_list, k=top_k) # --- 阶段 3: 质量精排 (根据 Metrics 选出最终的 k) --- print(f"[Step 2: 质量精排] 正在根据 Metrics 对候选经验进行打分...") scored_items = [] for eid in candidate_ids: if eid in content_map: item = content_map[eid] metrics = item["metrics"] # 计算综合分:Helpful 是正分,Harmful 是双倍惩罚扣分 quality_score = metrics["helpful"] - (metrics["harmful"] * 2.0) # 过滤门槛:如果被标记为严重有害(score < -2),直接丢弃 if quality_score < -2: print(f" - 剔除有害经验: {eid} (Helpful: {metrics['helpful']}, Harmful: {metrics['harmful']})") continue scored_items.append({ "id": eid, "content": item["content"], "helpful": metrics["helpful"], "quality_score": quality_score }) # 按照质量分排序,质量分相同时按 helpful 排序 final_sorted = sorted(scored_items, key=lambda x: (x["quality_score"], x["helpful"]), reverse=True) # 截取最终的 top_k result = final_sorted[:top_k] print(f"[Step 2: 质量精排] 最终选定经验: {[it['id'] for it in result]}") print(f"[Experience System] 检索结束。\n") return result async def _batch_update_experiences(update_map: Dict[str, Dict[str, Any]], context: Optional[Any] = None): """ 物理层:批量更新经验。 修正点:正确使用 new_sections 集合,确保文件结构的完整性与并发进化的同步。 """ experiences_path = _get_experiences_path(context) if not os.path.exists(experiences_path) or not update_map: return 0 with open(experiences_path, "r", encoding="utf-8") as f: full_content = f.read() # 使用正则表达式解析,避免误分割 pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)' matches = re.findall(pattern, full_content, re.DOTALL) new_entries = [] evolution_tasks = [] evolution_registry = {} # task_idx -> entry_idx # --- 第一阶段:处理所有块 --- for yaml_str, body in matches: try: meta = yaml.safe_load(yaml_str) if not isinstance(meta, dict): logger.error(f"跳过损坏的经验块: metadata 不是 dict") continue eid = meta.get("id") if not eid: logger.error("跳过损坏的经验块: 缺少 id") continue if eid in update_map: instr = update_map[eid] action = instr.get("action") feedback = instr.get("feedback") # 处理 mixed 中间态 if action == "mixed": meta["metrics"]["helpful"] += 1 action = "evolve" if action == "helpful": meta["metrics"]["helpful"] += 1 elif action == "harmful": meta["metrics"]["harmful"] += 1 elif action == "evolve" and feedback: # 注册进化任务 task = _evolve_body_with_llm(body.strip(), feedback) evolution_tasks.append(task) # 记录该任务对应的 entry 索引 evolution_registry[len(evolution_tasks) - 1] = len(new_entries) meta["metrics"]["helpful"] += 1 meta["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 序列化并加入 new_entries meta_str = yaml.dump(meta, allow_unicode=True).strip() new_entries.append((meta_str, body.strip())) except Exception as e: logger.error(f"跳过损坏的经验块: {e}") continue # --- 第二阶段:并发进化 --- if evolution_tasks: print(f"🧬 并发处理 {len(evolution_tasks)} 条经验进化...") evolved_results = await asyncio.gather(*evolution_tasks) # 精准回填:替换对应 entry 的 body for task_idx, entry_idx in evolution_registry.items(): meta_str, _ = new_entries[entry_idx] new_entries[entry_idx] = (meta_str, evolved_results[task_idx].strip()) # --- 第三阶段:原子化写回 --- final_parts = [] for meta_str, body in new_entries: final_parts.append(f"---\n{meta_str}\n---\n{body}\n") final_content = "\n".join(final_parts) with open(experiences_path, "w", encoding="utf-8") as f: f.write(final_content) return len(update_map) # ===== 经验库瘦身 ===== async def slim_experiences(model: str = "anthropic/claude-sonnet-4.5", context: Optional[Any] = None) -> str: """ 经验库瘦身:调用顶级大模型,将经验库中语义相似的经验合并精简。 返回瘦身报告字符串。 """ experiences_path = _get_experiences_path(context) if not os.path.exists(experiences_path): return "经验文件不存在,无需瘦身。" with open(experiences_path, "r", encoding="utf-8") as f: file_content = f.read() # 使用正则表达式解析,避免误分割 pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)' matches = re.findall(pattern, file_content, re.DOTALL) parsed = [] for yaml_str, body in matches: try: meta = yaml.safe_load(yaml_str) if not isinstance(meta, dict): continue parsed.append({"meta": meta, "body": body.strip()}) except Exception: continue if len(parsed) < 2: return f"经验库仅有 {len(parsed)} 条,无需瘦身。" # 构造发给大模型的内容 entries_text = "" for p in parsed: m = p["meta"] entries_text += f"[ID: {m.get('id')}] [Tags: {m.get('tags', {})}] " entries_text += f"[Metrics: {m.get('metrics', {})}]\n" entries_text += f"{p['body']}\n\n" prompt = f"""你是一个 AI Agent 经验库管理员。以下是当前经验库的全部条目,请执行瘦身操作: 【任务】: 1. 识别语义高度相似或重复的经验,将它们合并为一条更精炼、更通用的经验。 2. 合并时保留 helpful 最高的那条的 ID 和 metrics(metrics 中 helpful/harmful 取各条之和)。 3. 对于独立的、无重复的经验,保持原样不动。 4. 保持 ACE 规范格式:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。 【当前经验库】: {entries_text} 【输出格式要求】: 严格按以下格式输出每条经验,条目之间用 === 分隔: ID: <保留的id> TAGS: METRICS: BODY: <合并后的经验正文> === 最后一行输出合并报告,格式: REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。 禁止输出任何开场白或解释。""" try: print(f"\n[经验瘦身] 正在调用 {model} 分析 {len(parsed)} 条经验...") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model=model ) content = response.get("content", "").strip() if not content: return "大模型返回为空,瘦身失败。" # 解析大模型输出,重建经验文件 report_line = "" new_entries = [] blocks = [b.strip() for b in content.split("===") if b.strip()] for block in blocks: if block.startswith("REPORT:"): report_line = block continue lines = block.split("\n") eid, tags, metrics, body_lines = None, {}, {}, [] current_field = None for line in lines: if line.startswith("ID:"): eid = line[3:].strip() current_field = None elif line.startswith("TAGS:"): try: tags = yaml.safe_load(line[5:].strip()) or {} except Exception: tags = {} current_field = None elif line.startswith("METRICS:"): try: metrics = yaml.safe_load(line[8:].strip()) or {} except Exception: metrics = {"helpful": 0, "harmful": 0} current_field = None elif line.startswith("BODY:"): body_lines.append(line[5:].strip()) current_field = "body" elif current_field == "body": body_lines.append(line) if eid and body_lines: meta = { "id": eid, "tags": tags, "metrics": metrics, "updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } meta_str = yaml.dump(meta, allow_unicode=True).strip() body_str = "\n".join(body_lines).strip() new_entries.append(f"---\n{meta_str}\n---\n{body_str}\n") if not new_entries: return "解析大模型输出失败,经验库未修改。" # 写回文件 final = "\n".join(new_entries) with open(experiences_path, "w", encoding="utf-8") as f: f.write(final) result = f"瘦身完成:{len(parsed)} → {len(new_entries)} 条经验。" if report_line: result += f"\n{report_line}" print(f"[经验瘦身] {result}") return result except Exception as e: logger.error(f"经验瘦身失败: {e}") return f"瘦身失败: {e}" # ===== 对外 Tool 接口 ===== from agent.tools import tool, ToolContext @tool(description="通过两阶段检索获取最相关的历史经验") async def get_experience(query: str, k: int = 3, context: Optional[ToolContext] = None): """ 通过两阶段检索获取最相关的历史经验。 第一阶段语义匹配(2*k),第二阶段质量精排(k)。 """ relevant_items = await _get_structured_experiences( query_text=query, top_k=k, context=context ) if not relevant_items: return "未找到足够相关的优质经验。" return { "items": relevant_items, "count": len(relevant_items) } @tool() async def update_experiences(feedback_list: List[Dict[str, Any]], context: Optional[ToolContext] = None): """ 批量反馈历史经验的有效性。 Args: feedback_list: 评价列表,每个元素包含: - ex_id: (str) 经验 ID - is_effective: (bool) 是否有效 - feedback: (str, optional) 改进建议,若有效且有建议则触发经验进化 """ if not feedback_list: return "反馈列表为空。" # 将 Agent 的输入转换为底层函数需要的映射表格式 update_map = {} for item in feedback_list: ex_id = item.get("ex_id") is_effective = item.get("is_effective") comment = item.get("feedback", "") action = "helpful" if is_effective else "harmful" if is_effective and comment: action = "evolve" update_map[ex_id] = { "action": action, "feedback": comment } count = await _batch_update_experiences(update_map, context) return f"成功同步了 {count} 条经验的反馈。感谢你的评价!"