""" Phase 2.1.1: 工序聚类(workflow 形式) 直接读取 source.json + case_detailed.json,调用 LLM 生成 blueprint, 输出到 blueprint_temp.json """ import asyncio import json from pathlib import Path from typing import Any, Dict from examples.process_pipeline.script.llm_helper import call_llm_with_retry from examples.process_pipeline.script.validate_schema import validate_blueprint_temp def load_prompt_template(prompt_name: str) -> str: base_dir = Path(__file__).parent.parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" with open(prompt_path, "r", encoding="utf-8") as f: content = f.read() if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] content = content.replace("$system$", "").replace("$user$", "") return content.strip() async def cluster_processes( source_file: Path, detailed_file: Path, output_file: Path, requirement: str, llm_call, model: str = "anthropic/claude-sonnet-4-6", ) -> Dict[str, Any]: """ 读取 source.json + case_detailed.json,调用 LLM 生成 blueprint """ # 读取输入文件 source_data = {} detailed_data = {} if source_file.exists(): with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) if detailed_file.exists(): with open(detailed_file, "r", encoding="utf-8") as f: detailed_data = json.load(f) # 构造输入数据摘要 cases_summary = [] for case in detailed_data.get("cases", []): case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}") workflow = case.get("workflow") steps_text = "" if workflow and "工序步骤" in workflow: steps = workflow["工序步骤"] steps_text = " → ".join([s.get("步骤描述", "") for s in steps]) # 从 source.json 获取原始帖子信息 body_text = "" for src in source_data.get("sources", []): src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}" if src_id == case_id: body_text = src.get("post", {}).get("body_text", "")[:500] break cases_summary.append({ "case_id": case_id, "title": case.get("title", ""), "source_url": case.get("source_url", ""), "steps": steps_text, "body_preview": body_text, }) cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2) print(f"Loaded {len(cases_summary)} cases for process clustering") # 加载 prompt try: prompt_template = load_prompt_template("process_cluster") prompt = prompt_template.replace("%requirement%", requirement) prompt = prompt.replace("%cases_data%", cases_json) except Exception: prompt = f"""你是一个 AI 创作工作流分析师。请按照核心创作手段对以下案例进行聚类。 需求:{requirement} 案例数据: {cases_json} 输出 JSON 格式: {{"clusters": [{{"cluster_id": "A", "cluster_name": "...", "工序步骤": [{{"步骤序号": 1, "步骤描述": "..."}}], "关联案例": ["bili_BV1xxx"]}}]}}""" messages = [{"role": "user", "content": prompt}] cluster_data, total_cost = await call_llm_with_retry( llm_call=llm_call, messages=messages, model=model, temperature=0.2, max_tokens=4000, max_retries=3, validate_fn=validate_blueprint_temp, task_name="P2.1.1_ClusterProcesses", ) if cluster_data is None: cluster_data = {"clusters": []} output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(cluster_data, f, ensure_ascii=False, indent=2) return { "clusters": len(cluster_data.get("clusters", [])), "total_cost": total_cost, }