| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- #!/usr/bin/env python3
- """
- 原子能力提取工作流 - 自动化版本
- 使用 openrouter (claude-sonnet) 逐个读取工具文档,迭代式提取和融合原子能力
- """
- import asyncio
- import json
- import os
- import re
- import sys
- from pathlib import Path
- # 添加项目根目录
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.openrouter import openrouter_llm_call
- # ===== 配置 =====
- BASE_DIR = Path(__file__).parent
- TOOL_RESULTS_DIR = BASE_DIR / "tool_results"
- OUTPUT_FILE = BASE_DIR / "atomic_capabilities.md"
- PROMPT_FILE = BASE_DIR / "extract_atomic_capabilities.prompt"
- # ===== Prompt 加载(复用 match_nodes.py 的模式) =====
- def load_prompt(filepath: str) -> dict:
- """加载 .prompt 文件,解析 frontmatter 和 $role$ 分段"""
- text = Path(filepath).read_text(encoding="utf-8")
- config = {}
- if text.startswith("---"):
- _, fm, text = text.split("---", 2)
- for line in fm.strip().splitlines():
- if ":" in line:
- k, v = line.split(":", 1)
- k, v = k.strip(), v.strip()
- if v.replace(".", "", 1).isdigit():
- v = float(v) if "." in v else int(v)
- config[k] = v
- messages = []
- parts = re.split(r'^\$(\w+)\$\s*$', text.strip(), flags=re.MULTILINE)
- for i in range(1, len(parts), 2):
- role = parts[i].strip()
- content = parts[i + 1].strip() if i + 1 < len(parts) else ""
- messages.append({"role": role, "content": content})
- return {"config": config, "messages": messages}
- def render_messages(prompt_data: dict, variables: dict) -> list[dict]:
- """用变量替换 prompt 模板中的 {var} 占位符"""
- rendered = []
- for msg in prompt_data["messages"]:
- content = msg["content"]
- for k, v in variables.items():
- content = content.replace(f"{{{k}}}", str(v))
- rendered.append({"role": msg["role"], "content": content})
- return rendered
- # ===== 文件读取 =====
- def get_all_tool_dirs():
- """获取所有工具目录"""
- dirs = sorted([d for d in TOOL_RESULTS_DIR.iterdir() if d.is_dir()])
- return dirs
- def read_file(file_path):
- """读取文件内容"""
- with open(file_path, 'r', encoding='utf-8') as f:
- return f.read()
- def read_tool_files(tool_dir):
- """读取工具的使用介绍和实际用例"""
- usage_file = tool_dir / "使用介绍.md"
- case_file = tool_dir / "实际用例.md"
- content = ""
- if usage_file.exists():
- content += "# 使用介绍\n\n" + read_file(usage_file) + "\n\n"
- if case_file.exists():
- content += "# 实际用例\n\n" + read_file(case_file)
- return content
- # ===== 构建 user prompt =====
- def build_user_prompt(file_content, tool_name, existing_capabilities=""):
- """构建每轮迭代的 user prompt"""
- if existing_capabilities:
- user_prompt = f"""## 当前状态
- ### 已提取的原子能力
- {existing_capabilities}
- ## 你的工作
- 1. 仔细阅读下面的工具文档(使用介绍 + 实际用例)
- 2. 从中识别出**面向需求的原子能力**(注意:不是工具的技术操作)
- 3. 与已有能力对比:
- - 如果是全新的能力 → 添加,并说明来源
- - 如果已有能力可由新工具实现 → 融合,在「实现方式」中补充该工具
- - 如果是多个已有能力的组合 → 不添加,但在「发现的能力组合」中记录
- 4. 对于来源依据,要说明从哪个用例/帖子/文档章节提炼的,并简述其大概内容
- """
- else:
- user_prompt = """## 当前状态
- 这是第一次提取,当前没有已有能力。
- ## 你的工作
- 1. 仔细阅读下面的工具文档(使用介绍 + 实际用例)
- 2. 从中识别出**面向需求的原子能力**(注意:不是工具的技术操作)
- 3. 对于来源依据,要说明从哪个用例/帖子/文档章节提炼的,并简述其大概内容
- """
- user_prompt += f"""
- ## 当前要处理的工具
- **工具名称**: {tool_name}
- **文档内容**(包含使用介绍和实际用例):
- {file_content}
- ## 输出要求
- 请按以下格式输出:
- # 原子能力清单(更新后)
- ## 本轮分析
- 简要说明从 {tool_name} 中发现了哪些能力,哪些是新的,哪些与已有能力融合了。
- ## 新增能力
- [列出本次新增的能力,使用上述格式,每个能力都要有来源依据]
- ## 融合能力
- [列出本次融合/更新的能力,说明新增了哪些实现方式]
- ## 发现的能力组合
- [列出发现的能力组合关系,例如:能力A + 能力B + 能力C = 完成「电商产品图批量生成」]
- ## 完整能力清单
- [输出完整的、更新后的原子能力清单,包含所有能力(新增 + 已有 + 融合后的)]
- """
- return user_prompt
- # ===== LLM 调用 =====
- async def extract_capabilities_from_tool(prompt_data, tool_dir, existing_capabilities=""):
- """从工具目录提取原子能力"""
- tool_name = tool_dir.name
- print(f"\n📖 正在处理: {tool_name}")
- # 读取使用介绍和实际用例
- content = read_tool_files(tool_dir)
- # 构建 user prompt
- user_prompt = build_user_prompt(content, tool_name, existing_capabilities)
- # 渲染 prompt 模板
- messages = render_messages(prompt_data, {"user_prompt": user_prompt})
- # 从 prompt 文件读取配置
- model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514")
- temperature = prompt_data["config"].get("temperature", 0.3)
- max_tokens = prompt_data["config"].get("max_tokens", 16000)
- try:
- result = await openrouter_llm_call(
- messages, model=model, temperature=temperature, max_tokens=max_tokens
- )
- response = result["content"]
- # 打印 token 用量
- pt = result.get("prompt_tokens", 0)
- ct = result.get("completion_tokens", 0)
- cost = result.get("cost", 0)
- print(f" tokens: {pt} prompt + {ct} completion | cost: ${cost:.4f}")
- # 提取"完整能力清单"部分
- if "## 完整能力清单" in response:
- complete_list = response.split("## 完整能力清单")[1].strip()
- else:
- complete_list = response
- print(f"✅ {tool_name} 处理完成")
- return response, complete_list
- except Exception as e:
- print(f"❌ {tool_name} 处理失败: {e}")
- return None, existing_capabilities
- async def generate_json_index(prompt_data, capabilities_md):
- """把 markdown 格式的能力清单转成简洁 JSON"""
- prompt = f"""请把以下原子能力清单转成 JSON 数组,每个能力包含以下字段:
- ```json
- [
- {{
- "id": "能力ID",
- "name": "能力名称",
- "description": "一句话功能描述",
- "criteria": "判定标准(简洁)",
- "tools": ["支持的工具/方案1", "支持的工具/方案2"],
- "scenarios": ["典型场景1", "典型场景2"],
- "source_summary": "来源依据的简要概括"
- }}
- ]
- ```
- 要求:
- - 只输出 JSON,不要任何其他文字
- - 保持所有能力,不要遗漏
- - description 控制在 30 字以内
- - criteria 控制在 30 字以内
- 原子能力清单:
- {capabilities_md}
- """
- model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514")
- messages = [{"role": "user", "content": prompt}]
- try:
- result = await openrouter_llm_call(messages, model=model, temperature=0.1, max_tokens=8000)
- content = result["content"].strip()
- # 清理 markdown 代码块包裹
- if content.startswith("```"):
- content = content.split("\n", 1)[1]
- content = content.rsplit("```", 1)[0]
- # 验证是合法 JSON
- json.loads(content)
- return content
- except Exception as e:
- print(f"❌ JSON 索引生成失败: {e}")
- return None
- # ===== 主流程 =====
- async def main():
- print("🚀 开始提取原子能力...")
- print()
- # 加载 prompt 模板
- prompt_data = load_prompt(PROMPT_FILE)
- model = prompt_data["config"].get("model", "anthropic/claude-sonnet-4-20250514")
- print(f"🤖 使用模型: {model}")
- print()
- # 获取所有工具目录
- tool_dirs = get_all_tool_dirs()
- print(f"📁 找到 {len(tool_dirs)} 个工具:")
- for d in tool_dirs:
- files = list(d.glob("*.md"))
- print(f" - {d.name} ({len(files)} 个文件)")
- print()
- # 迭代处理每个工具
- existing_capabilities = ""
- all_responses = []
- for i, tool_dir in enumerate(tool_dirs, 1):
- print(f"{'='*60}")
- print(f"进度: [{i}/{len(tool_dirs)}]")
- response, complete_list = await extract_capabilities_from_tool(
- prompt_data, tool_dir, existing_capabilities
- )
- if response:
- all_responses.append({
- "tool": tool_dir.name,
- "response": response
- })
- existing_capabilities = complete_list
- # 保存最终结果
- print(f"\n{'='*60}")
- print("💾 保存结果...")
- # 保存完整能力清单(markdown)
- OUTPUT_FILE.write_text(existing_capabilities, encoding='utf-8')
- print(f"✅ 原子能力清单已保存到: {OUTPUT_FILE}")
- # 保存详细过程
- detail_file = BASE_DIR / "atomic_capabilities_detail.json"
- with open(detail_file, 'w', encoding='utf-8') as f:
- json.dump(all_responses, f, ensure_ascii=False, indent=2)
- print(f"✅ 详细过程已保存到: {detail_file}")
- # 最终一轮:让 LLM 把完整能力清单转成简洁 JSON
- print(f"\n{'='*60}")
- print("📋 生成简洁 JSON 索引...")
- json_result = await generate_json_index(prompt_data, existing_capabilities)
- if json_result:
- json_index_file = BASE_DIR / "atomic_capabilities_index.json"
- with open(json_index_file, 'w', encoding='utf-8') as f:
- f.write(json_result)
- print(f"✅ JSON 索引已保存到: {json_index_file}")
- print("\n🎉 所有文件处理完成!")
- if __name__ == "__main__":
- os.environ.setdefault("no_proxy", "*")
- asyncio.run(main())
|