| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- import logging
- import os
- import yaml
- import json
- import asyncio
- import re
- from typing import List, Optional, Dict, Any
- from datetime import datetime
- from ...llm.openrouter import openrouter_llm_call
- logger = logging.getLogger(__name__)
- # 默认经验存储路径(当无法从 context 获取时使用)
- DEFAULT_EXPERIENCES_PATH = "./.cache/experiences_restore.md"
- def _get_experiences_path(context: Optional[Any] = None) -> str:
- """
- 从 context 中获取 experiences_path,回退到默认路径。
- context 可能包含 runner 引用,从中读取配置的路径。
- """
- if context and isinstance(context, dict):
- runner = context.get("runner")
- if runner and hasattr(runner, "experiences_path"):
- path = runner.experiences_path or DEFAULT_EXPERIENCES_PATH
- print(f"[Experience] 使用 runner 配置的路径: {runner.experiences_path}")
- return path
- print(f"[Experience] 使用默认路径: {DEFAULT_EXPERIENCES_PATH}")
- return DEFAULT_EXPERIENCES_PATH
- # ===== 经验进化重写 =====
- async def _evolve_body_with_llm(old_body: str, feedback: str) -> str:
- """
- 使用检索级别的小模型 (Flash Lite) 执行经验进化重写。
- """
- prompt = f"""你是一个 AI Agent 经验库管理员。请根据反馈建议,对现有的 ACE 规范经验进行重写进化。
- 【原经验内容】:
- {old_body}
- 【实战反馈建议】:
- {feedback}
- 【重写要求】:
- 1. 保持 ACE 规范:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
- 2. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原经验,使其更具通用性和准确性。
- 3. 语言:简洁直接,使用中文。
- 4. 禁止:严禁输出任何开场白、解释语或 Markdown 标题,直接返回重写后的正文。
- """
- try:
- # 调用与检索路由相同的廉价模型
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
-
- evolved_content = response.get("content", "").strip()
-
- # 简单安全校验:如果 LLM 返回太短或为空,回退到原内容+追加
- if len(evolved_content) < 5:
- raise ValueError("LLM output too short")
-
- return evolved_content
-
- except Exception as e:
- logger.warning(f"小模型进化失败,采用追加模式回退: {e}")
- timestamp = datetime.now().strftime('%Y-%m-%d')
- return f"{old_body}\n- [Update {timestamp}]: {feedback}"
-
- # ===== 核心挑选逻辑 =====
- async def _route_experiences_by_llm(query_text: str, metadata_list: List[Dict], k: int = 3) -> List[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- # 扩大筛选范围到 2*k
- routing_k = k * 2
-
- routing_data = [
- {
- "id": m["id"],
- "tags": m["tags"],
- "helpful": m["metrics"]["helpful"]
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个经验检索专家。根据用户的当前意图,从下列经验元数据中挑选出最相关的最多 {routing_k} 个经验 ID。
- 意图:"{query_text}"
- 可选经验列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: ex_01, ex_02)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 语义路由] 意图: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
-
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
-
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("ex_")]
-
- print(f"[Step 1: 语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- logger.error(f"LLM 经验路由失败: {e}")
- return []
- async def _get_structured_experiences(query_text: str, top_k: int = 3, context: Optional[Any] = None):
- """
- 1. 解析物理文件
- 2. 语义路由:提取 2*k 个 ID
- 3. 质量精排:基于 Metrics 筛选出最终的 k 个
- """
- print(f"[Experience System] runner.experiences_path: {context.get('runner').experiences_path if context and context.get('runner') else None}")
- experiences_path = _get_experiences_path(context)
- if not os.path.exists(experiences_path):
- print(f"[Experience System] 警告: 经验文件不存在 ({experiences_path})")
- return []
- with open(experiences_path, "r", encoding="utf-8") as f:
- file_content = f.read()
- # --- 阶段 1: 解析 ---
- # 使用正则表达式匹配 YAML frontmatter 块,避免误分割
- pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
- matches = re.findall(pattern, file_content, re.DOTALL)
- content_map = {}
- metadata_list = []
- for yaml_str, raw_body in matches:
- try:
- metadata = yaml.safe_load(yaml_str)
- # 检查 metadata 类型
- if not isinstance(metadata, dict):
- logger.error(f"跳过损坏的经验块: metadata 不是 dict,而是 {type(metadata).__name__}")
- continue
- eid = metadata.get("id")
- if not eid:
- logger.error("跳过损坏的经验块: 缺少 id 字段")
- continue
- meta_item = {
- "id": eid,
- "tags": metadata.get("tags", {}),
- "metrics": metadata.get("metrics", {"helpful": 0, "harmful": 0}),
- }
- metadata_list.append(meta_item)
- content_map[eid] = {
- "content": raw_body.strip(),
- "metrics": meta_item["metrics"]
- }
- except Exception as e:
- logger.error(f"跳过损坏的经验块: {e}")
- continue
- # --- 阶段 2: 语义路由 (取 2*k) ---
- candidate_ids = await _route_experiences_by_llm(query_text, metadata_list, k=top_k)
- # --- 阶段 3: 质量精排 (根据 Metrics 选出最终的 k) ---
- print(f"[Step 2: 质量精排] 正在根据 Metrics 对候选经验进行打分...")
- scored_items = []
-
- for eid in candidate_ids:
- if eid in content_map:
- item = content_map[eid]
- metrics = item["metrics"]
- # 计算综合分:Helpful 是正分,Harmful 是双倍惩罚扣分
- quality_score = metrics["helpful"] - (metrics["harmful"] * 2.0)
-
- # 过滤门槛:如果被标记为严重有害(score < -2),直接丢弃
- if quality_score < -2:
- print(f" - 剔除有害经验: {eid} (Helpful: {metrics['helpful']}, Harmful: {metrics['harmful']})")
- continue
-
- scored_items.append({
- "id": eid,
- "content": item["content"],
- "helpful": metrics["helpful"],
- "quality_score": quality_score
- })
- # 按照质量分排序,质量分相同时按 helpful 排序
- final_sorted = sorted(scored_items, key=lambda x: (x["quality_score"], x["helpful"]), reverse=True)
-
- # 截取最终的 top_k
- result = final_sorted[:top_k]
-
- print(f"[Step 2: 质量精排] 最终选定经验: {[it['id'] for it in result]}")
- print(f"[Experience System] 检索结束。\n")
- return result
- async def _batch_update_experiences(update_map: Dict[str, Dict[str, Any]], context: Optional[Any] = None):
- """
- 物理层:批量更新经验。
- 修正点:正确使用 new_sections 集合,确保文件结构的完整性与并发进化的同步。
- """
- experiences_path = _get_experiences_path(context)
- if not os.path.exists(experiences_path) or not update_map:
- return 0
- with open(experiences_path, "r", encoding="utf-8") as f:
- full_content = f.read()
- # 使用正则表达式解析,避免误分割
- pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
- matches = re.findall(pattern, full_content, re.DOTALL)
- new_entries = []
- evolution_tasks = []
- evolution_registry = {} # task_idx -> entry_idx
- # --- 第一阶段:处理所有块 ---
- for yaml_str, body in matches:
- try:
- meta = yaml.safe_load(yaml_str)
- if not isinstance(meta, dict):
- logger.error(f"跳过损坏的经验块: metadata 不是 dict")
- continue
- eid = meta.get("id")
- if not eid:
- logger.error("跳过损坏的经验块: 缺少 id")
- continue
- if eid in update_map:
- instr = update_map[eid]
- action = instr.get("action")
- feedback = instr.get("feedback")
- # 处理 mixed 中间态
- if action == "mixed":
- meta["metrics"]["helpful"] += 1
- action = "evolve"
- if action == "helpful":
- meta["metrics"]["helpful"] += 1
- elif action == "harmful":
- meta["metrics"]["harmful"] += 1
- elif action == "evolve" and feedback:
- # 注册进化任务
- task = _evolve_body_with_llm(body.strip(), feedback)
- evolution_tasks.append(task)
- # 记录该任务对应的 entry 索引
- evolution_registry[len(evolution_tasks) - 1] = len(new_entries)
- meta["metrics"]["helpful"] += 1
- meta["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 序列化并加入 new_entries
- meta_str = yaml.dump(meta, allow_unicode=True).strip()
- new_entries.append((meta_str, body.strip()))
- except Exception as e:
- logger.error(f"跳过损坏的经验块: {e}")
- continue
- # --- 第二阶段:并发进化 ---
- if evolution_tasks:
- print(f"🧬 并发处理 {len(evolution_tasks)} 条经验进化...")
- evolved_results = await asyncio.gather(*evolution_tasks)
- # 精准回填:替换对应 entry 的 body
- for task_idx, entry_idx in evolution_registry.items():
- meta_str, _ = new_entries[entry_idx]
- new_entries[entry_idx] = (meta_str, evolved_results[task_idx].strip())
- # --- 第三阶段:原子化写回 ---
- final_parts = []
- for meta_str, body in new_entries:
- final_parts.append(f"---\n{meta_str}\n---\n{body}\n")
- final_content = "\n".join(final_parts)
- with open(experiences_path, "w", encoding="utf-8") as f:
- f.write(final_content)
- return len(update_map)
- # ===== 经验库瘦身 =====
- async def slim_experiences(model: str = "anthropic/claude-sonnet-4.5", context: Optional[Any] = None) -> str:
- """
- 经验库瘦身:调用顶级大模型,将经验库中语义相似的经验合并精简。
- 返回瘦身报告字符串。
- """
- experiences_path = _get_experiences_path(context)
- if not os.path.exists(experiences_path):
- return "经验文件不存在,无需瘦身。"
- with open(experiences_path, "r", encoding="utf-8") as f:
- file_content = f.read()
- # 使用正则表达式解析,避免误分割
- pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
- matches = re.findall(pattern, file_content, re.DOTALL)
- parsed = []
- for yaml_str, body in matches:
- try:
- meta = yaml.safe_load(yaml_str)
- if not isinstance(meta, dict):
- continue
- parsed.append({"meta": meta, "body": body.strip()})
- except Exception:
- continue
- if len(parsed) < 2:
- return f"经验库仅有 {len(parsed)} 条,无需瘦身。"
- # 构造发给大模型的内容
- entries_text = ""
- for p in parsed:
- m = p["meta"]
- entries_text += f"[ID: {m.get('id')}] [Tags: {m.get('tags', {})}] "
- entries_text += f"[Metrics: {m.get('metrics', {})}]\n"
- entries_text += f"{p['body']}\n\n"
- prompt = f"""你是一个 AI Agent 经验库管理员。以下是当前经验库的全部条目,请执行瘦身操作:
- 【任务】:
- 1. 识别语义高度相似或重复的经验,将它们合并为一条更精炼、更通用的经验。
- 2. 合并时保留 helpful 最高的那条的 ID 和 metrics(metrics 中 helpful/harmful 取各条之和)。
- 3. 对于独立的、无重复的经验,保持原样不动。
- 4. 保持 ACE 规范格式:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
- 【当前经验库】:
- {entries_text}
- 【输出格式要求】:
- 严格按以下格式输出每条经验,条目之间用 === 分隔:
- ID: <保留的id>
- TAGS: <yaml格式的tags>
- METRICS: <yaml格式的metrics>
- BODY: <合并后的经验正文>
- ===
- 最后一行输出合并报告,格式:
- REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
- 禁止输出任何开场白或解释。"""
- try:
- print(f"\n[经验瘦身] 正在调用 {model} 分析 {len(parsed)} 条经验...")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model=model
- )
- content = response.get("content", "").strip()
- if not content:
- return "大模型返回为空,瘦身失败。"
- # 解析大模型输出,重建经验文件
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- eid, tags, metrics, body_lines = None, {}, {}, []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- eid = line[3:].strip()
- current_field = None
- elif line.startswith("TAGS:"):
- try:
- tags = yaml.safe_load(line[5:].strip()) or {}
- except Exception:
- tags = {}
- current_field = None
- elif line.startswith("METRICS:"):
- try:
- metrics = yaml.safe_load(line[8:].strip()) or {}
- except Exception:
- metrics = {"helpful": 0, "harmful": 0}
- current_field = None
- elif line.startswith("BODY:"):
- body_lines.append(line[5:].strip())
- current_field = "body"
- elif current_field == "body":
- body_lines.append(line)
- if eid and body_lines:
- meta = {
- "id": eid,
- "tags": tags,
- "metrics": metrics,
- "updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- }
- meta_str = yaml.dump(meta, allow_unicode=True).strip()
- body_str = "\n".join(body_lines).strip()
- new_entries.append(f"---\n{meta_str}\n---\n{body_str}\n")
- if not new_entries:
- return "解析大模型输出失败,经验库未修改。"
- # 写回文件
- final = "\n".join(new_entries)
- with open(experiences_path, "w", encoding="utf-8") as f:
- f.write(final)
- result = f"瘦身完成:{len(parsed)} → {len(new_entries)} 条经验。"
- if report_line:
- result += f"\n{report_line}"
- print(f"[经验瘦身] {result}")
- return result
- except Exception as e:
- logger.error(f"经验瘦身失败: {e}")
- return f"瘦身失败: {e}"
- # ===== 对外 Tool 接口 =====
- from agent.tools import tool, ToolContext
- @tool(description="通过两阶段检索获取最相关的历史经验")
- async def get_experience(query: str, k: int = 3, context: Optional[ToolContext] = None):
- """
- 通过两阶段检索获取最相关的历史经验。
- 第一阶段语义匹配(2*k),第二阶段质量精排(k)。
- """
- relevant_items = await _get_structured_experiences(
- query_text=query,
- top_k=k,
- context=context
- )
- if not relevant_items:
- return "未找到足够相关的优质经验。"
- return {
- "items": relevant_items,
- "count": len(relevant_items)
- }
- @tool()
- async def update_experiences(feedback_list: List[Dict[str, Any]], context: Optional[ToolContext] = None):
- """
- 批量反馈历史经验的有效性。
-
- Args:
- feedback_list: 评价列表,每个元素包含:
- - ex_id: (str) 经验 ID
- - is_effective: (bool) 是否有效
- - feedback: (str, optional) 改进建议,若有效且有建议则触发经验进化
- """
- if not feedback_list:
- return "反馈列表为空。"
- # 将 Agent 的输入转换为底层函数需要的映射表格式
- update_map = {}
- for item in feedback_list:
- ex_id = item.get("ex_id")
- is_effective = item.get("is_effective")
- comment = item.get("feedback", "")
- action = "helpful" if is_effective else "harmful"
- if is_effective and comment:
- action = "evolve"
-
- update_map[ex_id] = {
- "action": action,
- "feedback": comment
- }
- count = await _batch_update_experiences(update_map, context)
- return f"成功同步了 {count} 条经验的反馈。感谢你的评价!"
|