| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- """
- Phase 2.1.1: 工序聚类(workflow 形式)
- 直接读取 source.json + case_detailed.json,调用 LLM 生成 blueprint,
- 输出到 blueprint_temp.json
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Any, Dict
- from examples.process_pipeline.script.llm_helper import call_llm_with_retry
- from examples.process_pipeline.script.validate_schema import validate_blueprint_temp
- def load_prompt_template(prompt_name: str) -> str:
- base_dir = Path(__file__).parent.parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- with open(prompt_path, "r", encoding="utf-8") as f:
- content = f.read()
- if content.startswith("---"):
- parts = content.split("---", 2)
- if len(parts) >= 3:
- content = parts[2]
- content = content.replace("$system$", "").replace("$user$", "")
- return content.strip()
- async def cluster_processes(
- source_file: Path,
- detailed_file: Path,
- output_file: Path,
- requirement: str,
- llm_call,
- model: str = "anthropic/claude-sonnet-4-6",
- ) -> Dict[str, Any]:
- """
- 读取 source.json + case_detailed.json,调用 LLM 生成 blueprint
- """
- # 读取输入文件
- source_data = {}
- detailed_data = {}
- if source_file.exists():
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- if detailed_file.exists():
- with open(detailed_file, "r", encoding="utf-8") as f:
- detailed_data = json.load(f)
- # 构造输入数据摘要
- cases_summary = []
- for case in detailed_data.get("cases", []):
- case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}")
- workflow = case.get("workflow")
- steps_text = ""
- if workflow and "工序步骤" in workflow:
- steps = workflow["工序步骤"]
- steps_text = " → ".join([s.get("步骤描述", "") for s in steps])
- # 从 source.json 获取原始帖子信息
- body_text = ""
- for src in source_data.get("sources", []):
- src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}"
- if src_id == case_id:
- body_text = src.get("post", {}).get("body_text", "")[:500]
- break
- cases_summary.append({
- "case_id": case_id,
- "title": case.get("title", ""),
- "source_url": case.get("source_url", ""),
- "steps": steps_text,
- "body_preview": body_text,
- })
- cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2)
- print(f"Loaded {len(cases_summary)} cases for process clustering")
- # 加载 prompt
- try:
- prompt_template = load_prompt_template("process_cluster")
- prompt = prompt_template.replace("%requirement%", requirement)
- prompt = prompt.replace("%cases_data%", cases_json)
- except Exception:
- prompt = f"""你是一个 AI 创作工作流分析师。请按照核心创作手段对以下案例进行聚类。
- 需求:{requirement}
- 案例数据:
- {cases_json}
- 输出 JSON 格式:
- {{"clusters": [{{"cluster_id": "A", "cluster_name": "...", "工序步骤": [{{"步骤序号": 1, "步骤描述": "..."}}], "关联案例": ["bili_BV1xxx"]}}]}}"""
- messages = [{"role": "user", "content": prompt}]
- cluster_data, total_cost = await call_llm_with_retry(
- llm_call=llm_call,
- messages=messages,
- model=model,
- temperature=0.2,
- max_tokens=4000,
- max_retries=3,
- validate_fn=validate_blueprint_temp,
- task_name="P2.1.1_ClusterProcesses",
- )
- if cluster_data is None:
- cluster_data = {"clusters": []}
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(cluster_data, f, ensure_ascii=False, indent=2)
- return {
- "clusters": len(cluster_data.get("clusters", [])),
- "total_cost": total_cost,
- }
|