experience.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. import logging
  2. import os
  3. import yaml
  4. import json
  5. import asyncio
  6. import re
  7. from typing import List, Optional, Dict, Any
  8. from datetime import datetime
  9. from ...llm.openrouter import openrouter_llm_call
  10. logger = logging.getLogger(__name__)
  11. # 固定经验存储路径
  12. EXPERIENCES_PATH = "./.cache/experiences.md"
  13. # ===== 经验进化重写 =====
  14. async def _evolve_body_with_llm(old_body: str, feedback: str) -> str:
  15. """
  16. 使用检索级别的小模型 (Flash Lite) 执行经验进化重写。
  17. """
  18. prompt = f"""你是一个 AI Agent 经验库管理员。请根据反馈建议,对现有的 ACE 规范经验进行重写进化。
  19. 【原经验内容】:
  20. {old_body}
  21. 【实战反馈建议】:
  22. {feedback}
  23. 【重写要求】:
  24. 1. 保持 ACE 规范:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
  25. 2. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原经验,使其更具通用性和准确性。
  26. 3. 语言:简洁直接,使用中文。
  27. 4. 禁止:严禁输出任何开场白、解释语或 Markdown 标题,直接返回重写后的正文。
  28. """
  29. try:
  30. # 调用与检索路由相同的廉价模型
  31. response = await openrouter_llm_call(
  32. messages=[{"role": "user", "content": prompt}],
  33. model="google/gemini-2.0-flash-001"
  34. )
  35. evolved_content = response.get("content", "").strip()
  36. # 简单安全校验:如果 LLM 返回太短或为空,回退到原内容+追加
  37. if len(evolved_content) < 5:
  38. raise ValueError("LLM output too short")
  39. return evolved_content
  40. except Exception as e:
  41. logger.warning(f"小模型进化失败,采用追加模式回退: {e}")
  42. timestamp = datetime.now().strftime('%Y-%m-%d')
  43. return f"{old_body}\n- [Update {timestamp}]: {feedback}"
  44. # ===== 核心挑选逻辑 =====
  45. async def _route_experiences_by_llm(query_text: str, metadata_list: List[Dict], k: int = 3) -> List[str]:
  46. """
  47. 第一阶段:语义路由。
  48. 让 LLM 挑选出 2*k 个语义相关的 ID。
  49. """
  50. if not metadata_list:
  51. return []
  52. # 扩大筛选范围到 2*k
  53. routing_k = k * 2
  54. routing_data = [
  55. {
  56. "id": m["id"],
  57. "tags": m["tags"],
  58. "helpful": m["metrics"]["helpful"]
  59. } for m in metadata_list
  60. ]
  61. prompt = f"""
  62. 你是一个经验检索专家。根据用户的当前意图,从下列经验元数据中挑选出最相关的最多 {routing_k} 个经验 ID。
  63. 意图:"{query_text}"
  64. 可选经验列表:
  65. {json.dumps(routing_data, ensure_ascii=False, indent=1)}
  66. 请直接输出 ID 列表,用逗号分隔(例如: ex_01, ex_02)。若无相关项请输出 "None"。
  67. """
  68. try:
  69. print(f"\n[Step 1: 语义路由] 意图: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
  70. response = await openrouter_llm_call(
  71. messages=[{"role": "user", "content": prompt}],
  72. model="google/gemini-2.0-flash-001"
  73. )
  74. content = response.get("content", "").strip()
  75. selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("ex_")]
  76. print(f"[Step 1: 语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
  77. return selected_ids
  78. except Exception as e:
  79. logger.error(f"LLM 经验路由失败: {e}")
  80. return []
  81. async def _get_structured_experiences(query_text: str, top_k: int = 3):
  82. """
  83. 1. 解析物理文件
  84. 2. 语义路由:提取 2*k 个 ID
  85. 3. 质量精排:基于 Metrics 筛选出最终的 k 个
  86. """
  87. if not os.path.exists(EXPERIENCES_PATH):
  88. print(f"[Experience System] 警告: 经验文件不存在 ({EXPERIENCES_PATH})")
  89. return []
  90. with open(EXPERIENCES_PATH, "r", encoding="utf-8") as f:
  91. file_content = f.read()
  92. # --- 阶段 1: 解析 ---
  93. entries = file_content.split("---")
  94. content_map = {}
  95. metadata_list = []
  96. for i in range(1, len(entries), 2):
  97. try:
  98. metadata = yaml.safe_load(entries[i])
  99. raw_body = entries[i+1].strip()
  100. eid = metadata.get("id")
  101. meta_item = {
  102. "id": eid,
  103. "tags": metadata.get("tags", {}),
  104. "metrics": metadata.get("metrics", {"helpful": 0, "harmful": 0}),
  105. }
  106. metadata_list.append(meta_item)
  107. content_map[eid] = {
  108. "content": raw_body,
  109. "metrics": meta_item["metrics"]
  110. }
  111. except Exception:
  112. continue
  113. # --- 阶段 2: 语义路由 (取 2*k) ---
  114. candidate_ids = await _route_experiences_by_llm(query_text, metadata_list, k=top_k)
  115. # --- 阶段 3: 质量精排 (根据 Metrics 选出最终的 k) ---
  116. print(f"[Step 2: 质量精排] 正在根据 Metrics 对候选经验进行打分...")
  117. scored_items = []
  118. for eid in candidate_ids:
  119. if eid in content_map:
  120. item = content_map[eid]
  121. metrics = item["metrics"]
  122. # 计算综合分:Helpful 是正分,Harmful 是双倍惩罚扣分
  123. quality_score = metrics["helpful"] - (metrics["harmful"] * 2.0)
  124. # 过滤门槛:如果被标记为严重有害(score < -2),直接丢弃
  125. if quality_score < -2:
  126. print(f" - 剔除有害经验: {eid} (Helpful: {metrics['helpful']}, Harmful: {metrics['harmful']})")
  127. continue
  128. scored_items.append({
  129. "id": eid,
  130. "content": item["content"],
  131. "helpful": metrics["helpful"],
  132. "quality_score": quality_score
  133. })
  134. # 按照质量分排序,质量分相同时按 helpful 排序
  135. final_sorted = sorted(scored_items, key=lambda x: (x["quality_score"], x["helpful"]), reverse=True)
  136. # 截取最终的 top_k
  137. result = final_sorted[:top_k]
  138. print(f"[Step 2: 质量精排] 最终选定经验: {[it['id'] for it in result]}")
  139. print(f"[Experience System] 检索结束。\n")
  140. return result
  141. async def _batch_update_experiences(update_map: Dict[str, Dict[str, Any]]):
  142. """
  143. 物理层:批量更新经验。
  144. 修正点:正确使用 new_sections 集合,确保文件结构的完整性与并发进化的同步。
  145. """
  146. if not os.path.exists(EXPERIENCES_PATH) or not update_map:
  147. return 0
  148. with open(EXPERIENCES_PATH, "r", encoding="utf-8") as f:
  149. full_content = f.read()
  150. sections = full_content.split("---")
  151. # new_sections 用于存放最终要写回的所有块
  152. new_sections = [sections[0]]
  153. evolution_tasks = []
  154. # 记录哪些 new_sections 的索引需要填充进化后的 Body
  155. # 注意:这里的 index 指的是 new_sections 里的位置
  156. evolution_registry = {}
  157. # --- 第一阶段:处理所有块,填充 new_sections ---
  158. for i in range(1, len(sections), 2):
  159. try:
  160. meta = yaml.safe_load(sections[i])
  161. body = sections[i+1]
  162. eid = meta.get("id")
  163. if eid in update_map:
  164. instr = update_map[eid]
  165. action = instr.get("action")
  166. feedback = instr.get("feedback")
  167. # 处理 mixed 中间态
  168. if action == "mixed":
  169. meta["metrics"]["helpful"] += 1
  170. action = "evolve"
  171. if action == "helpful":
  172. meta["metrics"]["helpful"] += 1
  173. elif action == "harmful":
  174. meta["metrics"]["harmful"] += 1
  175. elif action == "evolve" and feedback:
  176. # 注册进化任务
  177. task = _evolve_body_with_llm(body.strip(), feedback)
  178. evolution_tasks.append(task)
  179. # 记录该任务对应 new_sections 列表中的位置
  180. # 此时 new_sections 已经存了 [0], 接下来要存 [meta, body]
  181. # 所以 meta 在 len(new_sections), body 在 len(new_sections) + 1
  182. evolution_registry[len(evolution_tasks) - 1] = len(new_sections) + 1
  183. meta["metrics"]["helpful"] += 1
  184. meta["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  185. # 无论是否更新,都将其序列化并加入 new_sections
  186. meta_str = "\n" + yaml.dump(meta, allow_unicode=True).strip() + "\n"
  187. new_sections.append(meta_str)
  188. new_sections.append(body) # 先放旧 Body,进化后再替换
  189. except Exception as e:
  190. logger.error(f"跳过损坏的经验块: {e}")
  191. continue
  192. # --- 第二阶段:并发进化 ---
  193. if evolution_tasks:
  194. print(f"🧬 并发处理 {len(evolution_tasks)} 条经验进化...")
  195. evolved_results = await asyncio.gather(*evolution_tasks)
  196. # 精准回填
  197. for task_idx, section_idx in evolution_registry.items():
  198. new_sections[section_idx] = f"\n{evolved_results[task_idx].strip()}\n"
  199. # --- 第三阶段:原子化写回 ---
  200. # 使用 new_sections 构建最终文本
  201. final_content = "---".join(new_sections)
  202. with open(EXPERIENCES_PATH, "w", encoding="utf-8") as f:
  203. f.write(final_content)
  204. return len(update_map)
  205. # ===== 经验库瘦身 =====
  206. async def slim_experiences(model: str = "anthropic/claude-sonnet-4.5") -> str:
  207. """
  208. 经验库瘦身:调用顶级大模型,将经验库中语义相似的经验合并精简。
  209. 返回瘦身报告字符串。
  210. """
  211. if not os.path.exists(EXPERIENCES_PATH):
  212. return "经验文件不存在,无需瘦身。"
  213. with open(EXPERIENCES_PATH, "r", encoding="utf-8") as f:
  214. file_content = f.read()
  215. # 解析所有经验条目
  216. entries = file_content.split("---")
  217. parsed = []
  218. for i in range(1, len(entries), 2):
  219. try:
  220. meta = yaml.safe_load(entries[i])
  221. body = entries[i + 1].strip()
  222. parsed.append({"meta": meta, "body": body})
  223. except Exception:
  224. continue
  225. if len(parsed) < 2:
  226. return f"经验库仅有 {len(parsed)} 条,无需瘦身。"
  227. # 构造发给大模型的内容
  228. entries_text = ""
  229. for p in parsed:
  230. m = p["meta"]
  231. entries_text += f"[ID: {m.get('id')}] [Tags: {m.get('tags', {})}] "
  232. entries_text += f"[Metrics: {m.get('metrics', {})}]\n"
  233. entries_text += f"{p['body']}\n\n"
  234. prompt = f"""你是一个 AI Agent 经验库管理员。以下是当前经验库的全部条目,请执行瘦身操作:
  235. 【任务】:
  236. 1. 识别语义高度相似或重复的经验,将它们合并为一条更精炼、更通用的经验。
  237. 2. 合并时保留 helpful 最高的那条的 ID 和 metrics(metrics 中 helpful/harmful 取各条之和)。
  238. 3. 对于独立的、无重复的经验,保持原样不动。
  239. 4. 保持 ACE 规范格式:当 [条件/Context] 时,应该 [动作/Action](原因:[逻辑/Reason])。
  240. 【当前经验库】:
  241. {entries_text}
  242. 【输出格式要求】:
  243. 严格按以下格式输出每条经验,条目之间用 === 分隔:
  244. ID: <保留的id>
  245. TAGS: <yaml格式的tags>
  246. METRICS: <yaml格式的metrics>
  247. BODY: <合并后的经验正文>
  248. ===
  249. 最后一行输出合并报告,格式:
  250. REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
  251. 禁止输出任何开场白或解释。"""
  252. try:
  253. print(f"\n[经验瘦身] 正在调用 {model} 分析 {len(parsed)} 条经验...")
  254. response = await openrouter_llm_call(
  255. messages=[{"role": "user", "content": prompt}],
  256. model=model
  257. )
  258. content = response.get("content", "").strip()
  259. if not content:
  260. return "大模型返回为空,瘦身失败。"
  261. # 解析大模型输出,重建经验文件
  262. report_line = ""
  263. new_entries = []
  264. blocks = [b.strip() for b in content.split("===") if b.strip()]
  265. for block in blocks:
  266. if block.startswith("REPORT:"):
  267. report_line = block
  268. continue
  269. lines = block.split("\n")
  270. eid, tags, metrics, body_lines = None, {}, {}, []
  271. current_field = None
  272. for line in lines:
  273. if line.startswith("ID:"):
  274. eid = line[3:].strip()
  275. current_field = None
  276. elif line.startswith("TAGS:"):
  277. try:
  278. tags = yaml.safe_load(line[5:].strip()) or {}
  279. except Exception:
  280. tags = {}
  281. current_field = None
  282. elif line.startswith("METRICS:"):
  283. try:
  284. metrics = yaml.safe_load(line[8:].strip()) or {}
  285. except Exception:
  286. metrics = {"helpful": 0, "harmful": 0}
  287. current_field = None
  288. elif line.startswith("BODY:"):
  289. body_lines.append(line[5:].strip())
  290. current_field = "body"
  291. elif current_field == "body":
  292. body_lines.append(line)
  293. if eid and body_lines:
  294. meta = {
  295. "id": eid,
  296. "tags": tags,
  297. "metrics": metrics,
  298. "updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  299. }
  300. meta_str = yaml.dump(meta, allow_unicode=True).strip()
  301. body_str = "\n".join(body_lines).strip()
  302. new_entries.append(f"---\n{meta_str}\n---\n{body_str}\n")
  303. if not new_entries:
  304. return "解析大模型输出失败,经验库未修改。"
  305. # 写回文件
  306. final = "\n".join(new_entries)
  307. with open(EXPERIENCES_PATH, "w", encoding="utf-8") as f:
  308. f.write(final)
  309. result = f"瘦身完成:{len(parsed)} → {len(new_entries)} 条经验。"
  310. if report_line:
  311. result += f"\n{report_line}"
  312. print(f"[经验瘦身] {result}")
  313. return result
  314. except Exception as e:
  315. logger.error(f"经验瘦身失败: {e}")
  316. return f"瘦身失败: {e}"
  317. # ===== 对外 Tool 接口 =====
  318. from agent.tools import tool
  319. @tool(description="通过两阶段检索获取最相关的历史经验")
  320. async def get_experience(query: str, k: int = 3):
  321. """
  322. 通过两阶段检索获取最相关的历史经验。
  323. 第一阶段语义匹配(2*k),第二阶段质量精排(k)。
  324. """
  325. relevant_items = await _get_structured_experiences(
  326. query_text=query,
  327. top_k=k
  328. )
  329. if not relevant_items:
  330. return "未找到足够相关的优质经验。"
  331. return {
  332. "items": relevant_items,
  333. "count": len(relevant_items)
  334. }
  335. @tool()
  336. async def update_experiences(feedback_list: List[Dict[str, Any]]):
  337. """
  338. 批量反馈历史经验的有效性。
  339. Args:
  340. feedback_list: 评价列表,每个元素包含:
  341. - ex_id: (str) 经验 ID
  342. - is_effective: (bool) 是否有效
  343. - feedback: (str, optional) 改进建议,若有效且有建议则触发经验进化
  344. """
  345. if not feedback_list:
  346. return "反馈列表为空。"
  347. # 将 Agent 的输入转换为底层函数需要的映射表格式
  348. update_map = {}
  349. for item in feedback_list:
  350. ex_id = item.get("ex_id")
  351. is_effective = item.get("is_effective")
  352. comment = item.get("feedback", "")
  353. action = "helpful" if is_effective else "harmful"
  354. if is_effective and comment:
  355. action = "evolve"
  356. update_map[ex_id] = {
  357. "action": action,
  358. "feedback": comment
  359. }
  360. count = await _batch_update_experiences(update_map)
  361. return f"成功同步了 {count} 条经验的反馈。感谢你的评价!"