#!/usr/bin/env python3 """ 原子能力提取工作流 - 自动化版本 使用 openrouter (claude-sonnet) 逐个读取工具文档,迭代式提取和融合原子能力 """ import asyncio import json import os import re import sys from pathlib import Path # 添加项目根目录 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.llm.openrouter import openrouter_llm_call # ===== 配置 ===== BASE_DIR = Path(__file__).parent TOOL_RESULTS_DIR = BASE_DIR / "tool_results" OUTPUT_FILE = BASE_DIR / "atomic_capabilities.md" PROMPT_FILE = BASE_DIR / "extract_atomic_capabilities.prompt" # ===== Prompt 加载(复用 match_nodes.py 的模式) ===== def load_prompt(filepath: str) -> dict: """加载 .prompt 文件,解析 frontmatter 和 $role$ 分段""" text = Path(filepath).read_text(encoding="utf-8") config = {} if text.startswith("---"): _, fm, text = text.split("---", 2) for line in fm.strip().splitlines(): if ":" in line: k, v = line.split(":", 1) k, v = k.strip(), v.strip() if v.replace(".", "", 1).isdigit(): v = float(v) if "." in v else int(v) config[k] = v messages = [] parts = re.split(r'^\$(\w+)\$\s*$', text.strip(), flags=re.MULTILINE) for i in range(1, len(parts), 2): role = parts[i].strip() content = parts[i + 1].strip() if i + 1 < len(parts) else "" messages.append({"role": role, "content": content}) return {"config": config, "messages": messages} def render_messages(prompt_data: dict, variables: dict) -> list[dict]: """用变量替换 prompt 模板中的 {var} 占位符""" rendered = [] for msg in prompt_data["messages"]: content = msg["content"] for k, v in variables.items(): content = content.replace(f"{{{k}}}", str(v)) rendered.append({"role": msg["role"], "content": content}) return rendered # ===== 文件读取 ===== def get_all_tool_dirs(): """获取所有工具目录""" dirs = sorted([d for d in TOOL_RESULTS_DIR.iterdir() if d.is_dir()]) return dirs def read_file(file_path): """读取文件内容""" with open(file_path, 'r', encoding='utf-8') as f: return f.read() def read_tool_files(tool_dir): """读取工具的使用介绍和实际用例""" usage_file = tool_dir / "使用介绍.md" case_file = tool_dir / "实际用例.md" content = "" if usage_file.exists(): content += "# 使用介绍\n\n" + read_file(usage_file) + "\n\n" if case_file.exists(): content += "# 实际用例\n\n" + read_file(case_file) return content # ===== 构建 user prompt ===== def build_user_prompt(file_content, tool_name, existing_capabilities=""): """构建每轮迭代的 user prompt""" if existing_capabilities: user_prompt = f"""## 当前状态 ### 已提取的原子能力 {existing_capabilities} ## 你的工作 1. 仔细阅读下面的工具文档(使用介绍 + 实际用例) 2. 从中识别出**面向需求的原子能力**(注意:不是工具的技术操作) 3. 与已有能力对比: - 如果是全新的能力 → 添加,并说明来源 - 如果已有能力可由新工具实现 → 融合,在「实现方式」中补充该工具 - 如果是多个已有能力的组合 → 不添加,但在「发现的能力组合」中记录 4. 对于来源依据,要说明从哪个用例/帖子/文档章节提炼的,并简述其大概内容 """ else: user_prompt = """## 当前状态 这是第一次提取,当前没有已有能力。 ## 你的工作 1. 仔细阅读下面的工具文档(使用介绍 + 实际用例) 2. 从中识别出**面向需求的原子能力**(注意:不是工具的技术操作) 3. 对于来源依据,要说明从哪个用例/帖子/文档章节提炼的,并简述其大概内容 """ user_prompt += f""" ## 当前要处理的工具 **工具名称**: {tool_name} **文档内容**(包含使用介绍和实际用例): {file_content} ## 输出要求 请按以下格式输出: # 原子能力清单(更新后) ## 本轮分析 简要说明从 {tool_name} 中发现了哪些能力,哪些是新的,哪些与已有能力融合了。 ## 新增能力 [列出本次新增的能力,使用上述格式,每个能力都要有来源依据] ## 融合能力 [列出本次融合/更新的能力,说明新增了哪些实现方式] ## 发现的能力组合 [列出发现的能力组合关系,例如:能力A + 能力B + 能力C = 完成「电商产品图批量生成」] ## 完整能力清单 [输出完整的、更新后的原子能力清单,包含所有能力(新增 + 已有 + 融合后的)] """ return user_prompt # ===== LLM 调用 ===== async def extract_capabilities_from_tool(prompt_data, tool_dir, existing_capabilities=""): """从工具目录提取原子能力""" tool_name = tool_dir.name print(f"\n📖 正在处理: {tool_name}") # 读取使用介绍和实际用例 content = read_tool_files(tool_dir) # 构建 user prompt user_prompt = build_user_prompt(content, tool_name, existing_capabilities) # 渲染 prompt 模板 messages = render_messages(prompt_data, {"user_prompt": user_prompt}) # 从 prompt 文件读取配置 model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514") temperature = prompt_data["config"].get("temperature", 0.3) max_tokens = prompt_data["config"].get("max_tokens", 16000) try: result = await openrouter_llm_call( messages, model=model, temperature=temperature, max_tokens=max_tokens ) response = result["content"] # 打印 token 用量 pt = result.get("prompt_tokens", 0) ct = result.get("completion_tokens", 0) cost = result.get("cost", 0) print(f" tokens: {pt} prompt + {ct} completion | cost: ${cost:.4f}") # 提取"完整能力清单"部分 if "## 完整能力清单" in response: complete_list = response.split("## 完整能力清单")[1].strip() else: complete_list = response print(f"✅ {tool_name} 处理完成") return response, complete_list except Exception as e: print(f"❌ {tool_name} 处理失败: {e}") return None, existing_capabilities async def generate_json_index(prompt_data, capabilities_md): """把 markdown 格式的能力清单转成简洁 JSON""" prompt = f"""请把以下原子能力清单转成 JSON 数组,每个能力包含以下字段: ```json [ {{ "id": "能力ID", "name": "能力名称", "description": "一句话功能描述", "criteria": "判定标准(简洁)", "tools": ["支持的工具/方案1", "支持的工具/方案2"], "scenarios": ["典型场景1", "典型场景2"], "source_summary": "来源依据的简要概括" }} ] ``` 要求: - 只输出 JSON,不要任何其他文字 - 保持所有能力,不要遗漏 - description 控制在 30 字以内 - criteria 控制在 30 字以内 原子能力清单: {capabilities_md} """ model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514") messages = [{"role": "user", "content": prompt}] try: result = await openrouter_llm_call(messages, model=model, temperature=0.1, max_tokens=8000) content = result["content"].strip() # 清理 markdown 代码块包裹 if content.startswith("```"): content = content.split("\n", 1)[1] content = content.rsplit("```", 1)[0] # 验证是合法 JSON json.loads(content) return content except Exception as e: print(f"❌ JSON 索引生成失败: {e}") return None # ===== 主流程 ===== async def main(): print("🚀 开始提取原子能力...") print() # 加载 prompt 模板 prompt_data = load_prompt(PROMPT_FILE) model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514") print(f"🤖 使用模型: {model}") print() # 获取所有工具目录 tool_dirs = get_all_tool_dirs() print(f"📁 找到 {len(tool_dirs)} 个工具:") for d in tool_dirs: files = list(d.glob("*.md")) print(f" - {d.name} ({len(files)} 个文件)") print() # 迭代处理每个工具 existing_capabilities = "" all_responses = [] for i, tool_dir in enumerate(tool_dirs, 1): print(f"{'='*60}") print(f"进度: [{i}/{len(tool_dirs)}]") response, complete_list = await extract_capabilities_from_tool( prompt_data, tool_dir, existing_capabilities ) if response: all_responses.append({ "tool": tool_dir.name, "response": response }) existing_capabilities = complete_list # 保存最终结果 print(f"\n{'='*60}") print("💾 保存结果...") # 保存完整能力清单(markdown) OUTPUT_FILE.write_text(existing_capabilities, encoding='utf-8') print(f"✅ 原子能力清单已保存到: {OUTPUT_FILE}") # 保存详细过程 detail_file = BASE_DIR / "atomic_capabilities_detail.json" with open(detail_file, 'w', encoding='utf-8') as f: json.dump(all_responses, f, ensure_ascii=False, indent=2) print(f"✅ 详细过程已保存到: {detail_file}") # 最终一轮:让 LLM 把完整能力清单转成简洁 JSON print(f"\n{'='*60}") print("📋 生成简洁 JSON 索引...") json_result = await generate_json_index(prompt_data, existing_capabilities) if json_result: json_index_file = BASE_DIR / "atomic_capabilities_index.json" with open(json_index_file, 'w', encoding='utf-8') as f: f.write(json_result) print(f"✅ JSON 索引已保存到: {json_index_file}") print("\n🎉 所有文件处理完成!") if __name__ == "__main__": os.environ.setdefault("no_proxy", "*") asyncio.run(main())