experience.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import logging
  2. import os
  3. import yaml
  4. import json
  5. import asyncio
  6. import re
  7. from typing import List, Optional, Dict, Any
  8. from datetime import datetime
  9. from ...llm.openrouter import openrouter_llm_call
  10. logger = logging.getLogger(__name__)
  11. # 默认经验存储路径(当无法从 context 获取时使用)
  12. DEFAULT_EXPERIENCES_PATH = "./.cache/experiences_restore.md"
  13. def _get_experiences_path(context: Optional[Any] = None) -> str:
  14. """
  15. 从 context 中获取 experiences_path,回退到默认路径。
  16. context 可能包含 runner 引用,从中读取配置的路径。
  17. """
  18. if context and isinstance(context, dict):
  19. runner = context.get("runner")
  20. if runner and hasattr(runner, "experiences_path"):
  21. path = runner.experiences_path or DEFAULT_EXPERIENCES_PATH
  22. print(f"[Experience] 使用 runner 配置的路径: {runner.experiences_path}")
  23. return path
  24. print(f"[Experience] 使用默认路径: {DEFAULT_EXPERIENCES_PATH}")
  25. return DEFAULT_EXPERIENCES_PATH
  26. # ===== 经验进化重写 =====
  27. async def _evolve_body_with_llm(old_body: str, feedback: str) -> str:
  28. """
  29. 使用检索级别的小模型 (Flash Lite) 执行经验进化重写。
  30. """
  31. prompt = f"""你是一个 AI Agent 经验库管理员。请根据反馈建议,对现有的 ACE 规范经验进行重写进化。
  32. 【原经验内容】:
  33. {old_body}
  34. 【实战反馈建议】:
  35. {feedback}
  36. 【重写要求】:
  37. 1. 保持 ACE 规范:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
  38. 2. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原经验,使其更具通用性和准确性。
  39. 3. 语言:简洁直接,使用中文。
  40. 4. 禁止:严禁输出任何开场白、解释语或 Markdown 标题,直接返回重写后的正文。
  41. """
  42. try:
  43. # 调用与检索路由相同的廉价模型
  44. response = await openrouter_llm_call(
  45. messages=[{"role": "user", "content": prompt}],
  46. model="google/gemini-2.0-flash-001"
  47. )
  48. evolved_content = response.get("content", "").strip()
  49. # 简单安全校验:如果 LLM 返回太短或为空,回退到原内容+追加
  50. if len(evolved_content) < 5:
  51. raise ValueError("LLM output too short")
  52. return evolved_content
  53. except Exception as e:
  54. logger.warning(f"小模型进化失败,采用追加模式回退: {e}")
  55. timestamp = datetime.now().strftime('%Y-%m-%d')
  56. return f"{old_body}\n- [Update {timestamp}]: {feedback}"
  57. # ===== 核心挑选逻辑 =====
  58. async def _route_experiences_by_llm(query_text: str, metadata_list: List[Dict], k: int = 3) -> List[str]:
  59. """
  60. 第一阶段:语义路由。
  61. 让 LLM 挑选出 2*k 个语义相关的 ID。
  62. """
  63. if not metadata_list:
  64. return []
  65. # 扩大筛选范围到 2*k
  66. routing_k = k * 2
  67. routing_data = [
  68. {
  69. "id": m["id"],
  70. "tags": m["tags"],
  71. "helpful": m["metrics"]["helpful"]
  72. } for m in metadata_list
  73. ]
  74. prompt = f"""
  75. 你是一个经验检索专家。根据用户的当前意图,从下列经验元数据中挑选出最相关的最多 {routing_k} 个经验 ID。
  76. 意图:"{query_text}"
  77. 可选经验列表:
  78. {json.dumps(routing_data, ensure_ascii=False, indent=1)}
  79. 请直接输出 ID 列表,用逗号分隔(例如: ex_01, ex_02)。若无相关项请输出 "None"。
  80. """
  81. try:
  82. print(f"\n[Step 1: 语义路由] 意图: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
  83. response = await openrouter_llm_call(
  84. messages=[{"role": "user", "content": prompt}],
  85. model="google/gemini-2.0-flash-001"
  86. )
  87. content = response.get("content", "").strip()
  88. selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("ex_")]
  89. print(f"[Step 1: 语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
  90. return selected_ids
  91. except Exception as e:
  92. logger.error(f"LLM 经验路由失败: {e}")
  93. return []
  94. async def _get_structured_experiences(query_text: str, top_k: int = 3, context: Optional[Any] = None):
  95. """
  96. 1. 解析物理文件
  97. 2. 语义路由:提取 2*k 个 ID
  98. 3. 质量精排:基于 Metrics 筛选出最终的 k 个
  99. """
  100. print(f"[Experience System] runner.experiences_path: {context.get('runner').experiences_path if context and context.get('runner') else None}")
  101. experiences_path = _get_experiences_path(context)
  102. if not os.path.exists(experiences_path):
  103. print(f"[Experience System] 警告: 经验文件不存在 ({experiences_path})")
  104. return []
  105. with open(experiences_path, "r", encoding="utf-8") as f:
  106. file_content = f.read()
  107. # --- 阶段 1: 解析 ---
  108. # 使用正则表达式匹配 YAML frontmatter 块,避免误分割
  109. pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
  110. matches = re.findall(pattern, file_content, re.DOTALL)
  111. content_map = {}
  112. metadata_list = []
  113. for yaml_str, raw_body in matches:
  114. try:
  115. metadata = yaml.safe_load(yaml_str)
  116. # 检查 metadata 类型
  117. if not isinstance(metadata, dict):
  118. logger.error(f"跳过损坏的经验块: metadata 不是 dict,而是 {type(metadata).__name__}")
  119. continue
  120. eid = metadata.get("id")
  121. if not eid:
  122. logger.error("跳过损坏的经验块: 缺少 id 字段")
  123. continue
  124. meta_item = {
  125. "id": eid,
  126. "tags": metadata.get("tags", {}),
  127. "metrics": metadata.get("metrics", {"helpful": 0, "harmful": 0}),
  128. }
  129. metadata_list.append(meta_item)
  130. content_map[eid] = {
  131. "content": raw_body.strip(),
  132. "metrics": meta_item["metrics"]
  133. }
  134. except Exception as e:
  135. logger.error(f"跳过损坏的经验块: {e}")
  136. continue
  137. # --- 阶段 2: 语义路由 (取 2*k) ---
  138. candidate_ids = await _route_experiences_by_llm(query_text, metadata_list, k=top_k)
  139. # --- 阶段 3: 质量精排 (根据 Metrics 选出最终的 k) ---
  140. print(f"[Step 2: 质量精排] 正在根据 Metrics 对候选经验进行打分...")
  141. scored_items = []
  142. for eid in candidate_ids:
  143. if eid in content_map:
  144. item = content_map[eid]
  145. metrics = item["metrics"]
  146. # 计算综合分:Helpful 是正分,Harmful 是双倍惩罚扣分
  147. quality_score = metrics["helpful"] - (metrics["harmful"] * 2.0)
  148. # 过滤门槛:如果被标记为严重有害(score < -2),直接丢弃
  149. if quality_score < -2:
  150. print(f" - 剔除有害经验: {eid} (Helpful: {metrics['helpful']}, Harmful: {metrics['harmful']})")
  151. continue
  152. scored_items.append({
  153. "id": eid,
  154. "content": item["content"],
  155. "helpful": metrics["helpful"],
  156. "quality_score": quality_score
  157. })
  158. # 按照质量分排序,质量分相同时按 helpful 排序
  159. final_sorted = sorted(scored_items, key=lambda x: (x["quality_score"], x["helpful"]), reverse=True)
  160. # 截取最终的 top_k
  161. result = final_sorted[:top_k]
  162. print(f"[Step 2: 质量精排] 最终选定经验: {[it['id'] for it in result]}")
  163. print(f"[Experience System] 检索结束。\n")
  164. return result
  165. async def _batch_update_experiences(update_map: Dict[str, Dict[str, Any]], context: Optional[Any] = None):
  166. """
  167. 物理层:批量更新经验。
  168. 修正点:正确使用 new_sections 集合,确保文件结构的完整性与并发进化的同步。
  169. """
  170. experiences_path = _get_experiences_path(context)
  171. if not os.path.exists(experiences_path) or not update_map:
  172. return 0
  173. with open(experiences_path, "r", encoding="utf-8") as f:
  174. full_content = f.read()
  175. # 使用正则表达式解析,避免误分割
  176. pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
  177. matches = re.findall(pattern, full_content, re.DOTALL)
  178. new_entries = []
  179. evolution_tasks = []
  180. evolution_registry = {} # task_idx -> entry_idx
  181. # --- 第一阶段:处理所有块 ---
  182. for yaml_str, body in matches:
  183. try:
  184. meta = yaml.safe_load(yaml_str)
  185. if not isinstance(meta, dict):
  186. logger.error(f"跳过损坏的经验块: metadata 不是 dict")
  187. continue
  188. eid = meta.get("id")
  189. if not eid:
  190. logger.error("跳过损坏的经验块: 缺少 id")
  191. continue
  192. if eid in update_map:
  193. instr = update_map[eid]
  194. action = instr.get("action")
  195. feedback = instr.get("feedback")
  196. # 处理 mixed 中间态
  197. if action == "mixed":
  198. meta["metrics"]["helpful"] += 1
  199. action = "evolve"
  200. if action == "helpful":
  201. meta["metrics"]["helpful"] += 1
  202. elif action == "harmful":
  203. meta["metrics"]["harmful"] += 1
  204. elif action == "evolve" and feedback:
  205. # 注册进化任务
  206. task = _evolve_body_with_llm(body.strip(), feedback)
  207. evolution_tasks.append(task)
  208. # 记录该任务对应的 entry 索引
  209. evolution_registry[len(evolution_tasks) - 1] = len(new_entries)
  210. meta["metrics"]["helpful"] += 1
  211. meta["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  212. # 序列化并加入 new_entries
  213. meta_str = yaml.dump(meta, allow_unicode=True).strip()
  214. new_entries.append((meta_str, body.strip()))
  215. except Exception as e:
  216. logger.error(f"跳过损坏的经验块: {e}")
  217. continue
  218. # --- 第二阶段:并发进化 ---
  219. if evolution_tasks:
  220. print(f"🧬 并发处理 {len(evolution_tasks)} 条经验进化...")
  221. evolved_results = await asyncio.gather(*evolution_tasks)
  222. # 精准回填:替换对应 entry 的 body
  223. for task_idx, entry_idx in evolution_registry.items():
  224. meta_str, _ = new_entries[entry_idx]
  225. new_entries[entry_idx] = (meta_str, evolved_results[task_idx].strip())
  226. # --- 第三阶段:原子化写回 ---
  227. final_parts = []
  228. for meta_str, body in new_entries:
  229. final_parts.append(f"---\n{meta_str}\n---\n{body}\n")
  230. final_content = "\n".join(final_parts)
  231. with open(experiences_path, "w", encoding="utf-8") as f:
  232. f.write(final_content)
  233. return len(update_map)
  234. # ===== 经验库瘦身 =====
  235. async def slim_experiences(model: str = "anthropic/claude-sonnet-4.5", context: Optional[Any] = None) -> str:
  236. """
  237. 经验库瘦身:调用顶级大模型,将经验库中语义相似的经验合并精简。
  238. 返回瘦身报告字符串。
  239. """
  240. experiences_path = _get_experiences_path(context)
  241. if not os.path.exists(experiences_path):
  242. return "经验文件不存在,无需瘦身。"
  243. with open(experiences_path, "r", encoding="utf-8") as f:
  244. file_content = f.read()
  245. # 使用正则表达式解析,避免误分割
  246. pattern = r'---\n(.*?)\n---\n(.*?)(?=\n---\n|\Z)'
  247. matches = re.findall(pattern, file_content, re.DOTALL)
  248. parsed = []
  249. for yaml_str, body in matches:
  250. try:
  251. meta = yaml.safe_load(yaml_str)
  252. if not isinstance(meta, dict):
  253. continue
  254. parsed.append({"meta": meta, "body": body.strip()})
  255. except Exception:
  256. continue
  257. if len(parsed) < 2:
  258. return f"经验库仅有 {len(parsed)} 条,无需瘦身。"
  259. # 构造发给大模型的内容
  260. entries_text = ""
  261. for p in parsed:
  262. m = p["meta"]
  263. entries_text += f"[ID: {m.get('id')}] [Tags: {m.get('tags', {})}] "
  264. entries_text += f"[Metrics: {m.get('metrics', {})}]\n"
  265. entries_text += f"{p['body']}\n\n"
  266. prompt = f"""你是一个 AI Agent 经验库管理员。以下是当前经验库的全部条目,请执行瘦身操作:
  267. 【任务】:
  268. 1. 识别语义高度相似或重复的经验,将它们合并为一条更精炼、更通用的经验。
  269. 2. 合并时保留 helpful 最高的那条的 ID 和 metrics(metrics 中 helpful/harmful 取各条之和)。
  270. 3. 对于独立的、无重复的经验,保持原样不动。
  271. 4. 保持 ACE 规范格式:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
  272. 【当前经验库】:
  273. {entries_text}
  274. 【输出格式要求】:
  275. 严格按以下格式输出每条经验,条目之间用 === 分隔:
  276. ID: <保留的id>
  277. TAGS: <yaml格式的tags>
  278. METRICS: <yaml格式的metrics>
  279. BODY: <合并后的经验正文>
  280. ===
  281. 最后一行输出合并报告,格式:
  282. REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
  283. 禁止输出任何开场白或解释。"""
  284. try:
  285. print(f"\n[经验瘦身] 正在调用 {model} 分析 {len(parsed)} 条经验...")
  286. response = await openrouter_llm_call(
  287. messages=[{"role": "user", "content": prompt}],
  288. model=model
  289. )
  290. content = response.get("content", "").strip()
  291. if not content:
  292. return "大模型返回为空,瘦身失败。"
  293. # 解析大模型输出,重建经验文件
  294. report_line = ""
  295. new_entries = []
  296. blocks = [b.strip() for b in content.split("===") if b.strip()]
  297. for block in blocks:
  298. if block.startswith("REPORT:"):
  299. report_line = block
  300. continue
  301. lines = block.split("\n")
  302. eid, tags, metrics, body_lines = None, {}, {}, []
  303. current_field = None
  304. for line in lines:
  305. if line.startswith("ID:"):
  306. eid = line[3:].strip()
  307. current_field = None
  308. elif line.startswith("TAGS:"):
  309. try:
  310. tags = yaml.safe_load(line[5:].strip()) or {}
  311. except Exception:
  312. tags = {}
  313. current_field = None
  314. elif line.startswith("METRICS:"):
  315. try:
  316. metrics = yaml.safe_load(line[8:].strip()) or {}
  317. except Exception:
  318. metrics = {"helpful": 0, "harmful": 0}
  319. current_field = None
  320. elif line.startswith("BODY:"):
  321. body_lines.append(line[5:].strip())
  322. current_field = "body"
  323. elif current_field == "body":
  324. body_lines.append(line)
  325. if eid and body_lines:
  326. meta = {
  327. "id": eid,
  328. "tags": tags,
  329. "metrics": metrics,
  330. "updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  331. }
  332. meta_str = yaml.dump(meta, allow_unicode=True).strip()
  333. body_str = "\n".join(body_lines).strip()
  334. new_entries.append(f"---\n{meta_str}\n---\n{body_str}\n")
  335. if not new_entries:
  336. return "解析大模型输出失败,经验库未修改。"
  337. # 写回文件
  338. final = "\n".join(new_entries)
  339. with open(experiences_path, "w", encoding="utf-8") as f:
  340. f.write(final)
  341. result = f"瘦身完成:{len(parsed)} → {len(new_entries)} 条经验。"
  342. if report_line:
  343. result += f"\n{report_line}"
  344. print(f"[经验瘦身] {result}")
  345. return result
  346. except Exception as e:
  347. logger.error(f"经验瘦身失败: {e}")
  348. return f"瘦身失败: {e}"
  349. # ===== 对外 Tool 接口 =====
  350. from agent.tools import tool, ToolContext
  351. @tool(description="通过两阶段检索获取最相关的历史经验")
  352. async def get_experience(query: str, k: int = 3, context: Optional[ToolContext] = None):
  353. """
  354. 通过两阶段检索获取最相关的历史经验。
  355. 第一阶段语义匹配(2*k),第二阶段质量精排(k)。
  356. """
  357. relevant_items = await _get_structured_experiences(
  358. query_text=query,
  359. top_k=k,
  360. context=context
  361. )
  362. if not relevant_items:
  363. return "未找到足够相关的优质经验。"
  364. return {
  365. "items": relevant_items,
  366. "count": len(relevant_items)
  367. }
  368. @tool()
  369. async def update_experiences(feedback_list: List[Dict[str, Any]], context: Optional[ToolContext] = None):
  370. """
  371. 批量反馈历史经验的有效性。
  372. Args:
  373. feedback_list: 评价列表,每个元素包含:
  374. - ex_id: (str) 经验 ID
  375. - is_effective: (bool) 是否有效
  376. - feedback: (str, optional) 改进建议,若有效且有建议则触发经验进化
  377. """
  378. if not feedback_list:
  379. return "反馈列表为空。"
  380. # 将 Agent 的输入转换为底层函数需要的映射表格式
  381. update_map = {}
  382. for item in feedback_list:
  383. ex_id = item.get("ex_id")
  384. is_effective = item.get("is_effective")
  385. comment = item.get("feedback", "")
  386. action = "helpful" if is_effective else "harmful"
  387. if is_effective and comment:
  388. action = "evolve"
  389. update_map[ex_id] = {
  390. "action": action,
  391. "feedback": comment
  392. }
  393. count = await _batch_update_experiences(update_map, context)
  394. return f"成功同步了 {count} 条经验的反馈。感谢你的评价!"