""" Phase 2.2.1: 能力初步聚类(workflow 形式) 直接读取 case_detailed.json + source.json,调用 LLM 提取原子能力, 输出到 capabilities_temp.json """ import asyncio import json from pathlib import Path from typing import Any, Dict from examples.process_pipeline.script.llm_helper import call_llm_with_retry from examples.process_pipeline.script.validate_schema import validate_capabilities_temp def load_prompt_template(prompt_name: str) -> str: base_dir = Path(__file__).parent.parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" with open(prompt_path, "r", encoding="utf-8") as f: content = f.read() if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] content = content.replace("$system$", "").replace("$user$", "") return content.strip() async def extract_capabilities_workflow( detailed_file: Path, source_file: Path, output_file: Path, requirement: str, llm_call, model: str = "anthropic/claude-sonnet-4-6", ) -> Dict[str, Any]: """ 读取 case_detailed.json + source.json,调用 LLM 提取原子能力 """ detailed_data = {} source_data = {} if detailed_file.exists(): with open(detailed_file, "r", encoding="utf-8") as f: detailed_data = json.load(f) if source_file.exists(): with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) # 构造输入数据摘要 cases_summary = [] for case in detailed_data.get("cases", []): case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}") workflow = case.get("workflow") steps_text = [] if workflow and "工序步骤" in workflow: steps = workflow["工序步骤"] steps_text = [s.get("步骤描述", "") for s in steps] # 从 source.json 获取原始帖子信息(用于辅助理解) body_text = "" for src in source_data.get("sources", []): src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}" if src_id == case_id: body_text = src.get("post", {}).get("body_text", "")[:500] break cases_summary.append({ "case_id": case_id, "title": case.get("title", ""), "workflow_steps": steps_text, "body_preview": body_text, }) cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2) print(f"Loaded {len(cases_summary)} cases for capability extraction", flush=True) # 加载 prompt try: prompt_template = load_prompt_template("extract_capabilities") prompt = prompt_template + f"\n\n需求:{requirement}\n\n案例数据:\n```json\n{cases_json}\n```" except Exception: prompt = f"""你是一个 AI 创作工作流分析师。请从以下案例中跨案例统一提取可复用的核心能力。 需求:{requirement} 案例数据: {cases_json} 输出 JSON 格式: {{"abilities": [{{"ability_id": "AB-01", "ability_name": "能力名称(4-10字)", "ability_description": "一句话描述该能力的核心动作和目的", "关联案例": ["bili_BV1xxx", "xhs_694e17e9000000001e006669"]}}]}}""" messages = [{"role": "user", "content": prompt}] capabilities_data, total_cost = await call_llm_with_retry( llm_call=llm_call, messages=messages, model=model, temperature=0.1, max_tokens=4000, max_retries=3, validate_fn=validate_capabilities_temp, task_name="P2.2.1_ExtractCapabilities", ) if capabilities_data is None: capabilities_data = {"abilities": []} output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(capabilities_data, f, ensure_ascii=False, indent=2) return { "capabilities": len(capabilities_data.get("abilities", [])), "total_cost": total_cost, }