""" Phase 2.1.2: 工序匹配度打分 读取 blueprint.json,对每个 blueprint 工序类进行与需求的匹配度打分, 输出到 process.json """ import asyncio import json import re from pathlib import Path from typing import Any, Dict from examples.process_pipeline.script.llm_helper import call_llm_with_retry from examples.process_pipeline.script.validate_schema import validate_process, List, Optional def load_prompt_template(prompt_name: str) -> str: """从 prompts 目录加载 prompt 模板""" base_dir = Path(__file__).parent.parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" with open(prompt_path, "r", encoding="utf-8") as f: content = f.read() if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] content = content.replace("$system$", "").replace("$user$", "") return content.strip() async def score_blueprints( blueprint_file: Path, output_file: Path, requirement: str, llm_call, model: str = "anthropic/claude-sonnet-4-6", ) -> Dict[str, Any]: """ 对 blueprint_temp.json 中的每个工序聚类进行匹配度打分 Returns: 统计信息 """ with open(blueprint_file, "r", encoding="utf-8") as f: blueprint_data = json.load(f) clusters = blueprint_data.get("clusters", []) if not clusters: return {"error": "No clusters found", "scored": 0, "total_cost": 0.0} # 构造 prompt try: prompt_template = load_prompt_template("score_processes") clusters_text = json.dumps(clusters, ensure_ascii=False, indent=2) prompt = prompt_template.replace("%requirement%", requirement) prompt = prompt.replace("%clusters_data%", clusters_text) except Exception: clusters_text = json.dumps(clusters, ensure_ascii=False, indent=2) prompt = f"""对以下工序聚类进行与需求的匹配度打分。 需求:{requirement} 工序聚类: {clusters_text} 直接输出 JSON: {{"scored_clusters": [{{"cluster_id": "A", "cluster_name": "...", "score": 0.85, "explanation": "评分理由"}}]}}""" messages = [{"role": "user", "content": prompt}] def _validate_scored_output(parsed): scored = parsed.get("scored_clusters", []) if not isinstance(scored, list): return "'scored_clusters' is not a list" if len(scored) == 0: return "'scored_clusters' is empty" for i, item in enumerate(scored): if "score" not in item: return f"scored_clusters[{i}] missing 'score'" if not isinstance(item["score"], (int, float)): return f"scored_clusters[{i}].score must be a number" if "explanation" not in item or not (item.get("explanation") or "").strip(): return f"scored_clusters[{i}] missing or empty 'explanation'" return None scored_data, total_cost = await call_llm_with_retry( llm_call=llm_call, messages=messages, model=model, temperature=0.1, max_tokens=4000, max_retries=3, validate_fn=_validate_scored_output, task_name="P2.1.2_ScoreProcesses", ) if scored_data is None: scored_data = {"scored_clusters": []} # 把 score 和 explanation 合并回原始 clusters scored_map = {} for sc in scored_data.get("scored_clusters", []): cid = sc.get("cluster_id") if cid: scored_map[cid] = sc merged_clusters = [] for cl in clusters: cid = cl.get("cluster_id") if cid in scored_map: cl["score"] = scored_map[cid].get("score", 0) cl["explanation"] = scored_map[cid].get("explanation", "") merged_clusters.append(cl) output_data = { "requirement": requirement, "clusters": merged_clusters, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return { "scored": len(scored_map), "total_cost": total_cost, }