| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- """
- Phase 2.2.1: 能力初步聚类(workflow 形式)
- 直接读取 case_detailed.json + source.json,调用 LLM 提取原子能力,
- 输出到 capabilities_temp.json
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Any, Dict
- from examples.process_pipeline.script.llm_helper import call_llm_with_retry
- from examples.process_pipeline.script.validate_schema import validate_capabilities_temp
- def load_prompt_template(prompt_name: str) -> str:
- base_dir = Path(__file__).parent.parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- with open(prompt_path, "r", encoding="utf-8") as f:
- content = f.read()
- if content.startswith("---"):
- parts = content.split("---", 2)
- if len(parts) >= 3:
- content = parts[2]
- content = content.replace("$system$", "").replace("$user$", "")
- return content.strip()
- async def extract_capabilities_workflow(
- detailed_file: Path,
- source_file: Path,
- output_file: Path,
- requirement: str,
- llm_call,
- model: str = "anthropic/claude-sonnet-4-6",
- ) -> Dict[str, Any]:
- """
- 读取 case_detailed.json + source.json,调用 LLM 提取原子能力
- """
- detailed_data = {}
- source_data = {}
- if detailed_file.exists():
- with open(detailed_file, "r", encoding="utf-8") as f:
- detailed_data = json.load(f)
- if source_file.exists():
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- # 构造输入数据摘要
- cases_summary = []
- for case in detailed_data.get("cases", []):
- case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}")
- workflow = case.get("workflow")
- steps_text = []
- if workflow and "工序步骤" in workflow:
- steps = workflow["工序步骤"]
- steps_text = [s.get("步骤描述", "") for s in steps]
- # 从 source.json 获取原始帖子信息(用于辅助理解)
- body_text = ""
- for src in source_data.get("sources", []):
- src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}"
- if src_id == case_id:
- body_text = src.get("post", {}).get("body_text", "")[:500]
- break
- cases_summary.append({
- "case_id": case_id,
- "title": case.get("title", ""),
- "workflow_steps": steps_text,
- "body_preview": body_text,
- })
- cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2)
- print(f"Loaded {len(cases_summary)} cases for capability extraction", flush=True)
- # 加载 prompt
- try:
- prompt_template = load_prompt_template("extract_capabilities")
- prompt = prompt_template + f"\n\n需求:{requirement}\n\n案例数据:\n```json\n{cases_json}\n```"
- except Exception:
- prompt = f"""你是一个 AI 创作工作流分析师。请从以下案例中跨案例统一提取可复用的核心能力。
- 需求:{requirement}
- 案例数据:
- {cases_json}
- 输出 JSON 格式:
- {{"abilities": [{{"ability_id": "AB-01", "ability_name": "能力名称(4-10字)", "ability_description": "一句话描述该能力的核心动作和目的", "关联案例": ["bili_BV1xxx", "xhs_694e17e9000000001e006669"]}}]}}"""
- messages = [{"role": "user", "content": prompt}]
- capabilities_data, total_cost = await call_llm_with_retry(
- llm_call=llm_call,
- messages=messages,
- model=model,
- temperature=0.1,
- max_tokens=4000,
- max_retries=3,
- validate_fn=validate_capabilities_temp,
- task_name="P2.2.1_ExtractCapabilities",
- )
- if capabilities_data is None:
- capabilities_data = {"abilities": []}
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(capabilities_data, f, ensure_ascii=False, indent=2)
- return {
- "capabilities": len(capabilities_data.get("abilities", [])),
- "total_cost": total_cost,
- }
|