| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- """
- Phase 2.1.2: 工序匹配度打分
- 读取 blueprint.json,对每个 blueprint 工序类进行与需求的匹配度打分,
- 输出到 process.json
- """
- import asyncio
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict
- from examples.process_pipeline.script.llm_helper import call_llm_with_retry
- from examples.process_pipeline.script.validate_schema import validate_process, List, Optional
- def load_prompt_template(prompt_name: str) -> str:
- """从 prompts 目录加载 prompt 模板"""
- base_dir = Path(__file__).parent.parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- with open(prompt_path, "r", encoding="utf-8") as f:
- content = f.read()
- if content.startswith("---"):
- parts = content.split("---", 2)
- if len(parts) >= 3:
- content = parts[2]
- content = content.replace("$system$", "").replace("$user$", "")
- return content.strip()
- async def score_blueprints(
- blueprint_file: Path,
- output_file: Path,
- requirement: str,
- llm_call,
- model: str = "anthropic/claude-sonnet-4-6",
- ) -> Dict[str, Any]:
- """
- 对 blueprint_temp.json 中的每个工序聚类进行匹配度打分
- Returns:
- 统计信息
- """
- with open(blueprint_file, "r", encoding="utf-8") as f:
- blueprint_data = json.load(f)
- clusters = blueprint_data.get("clusters", [])
- if not clusters:
- return {"error": "No clusters found", "scored": 0, "total_cost": 0.0}
- # 构造 prompt
- try:
- prompt_template = load_prompt_template("score_processes")
- clusters_text = json.dumps(clusters, ensure_ascii=False, indent=2)
- prompt = prompt_template.replace("%requirement%", requirement)
- prompt = prompt.replace("%clusters_data%", clusters_text)
- except Exception:
- clusters_text = json.dumps(clusters, ensure_ascii=False, indent=2)
- prompt = f"""对以下工序聚类进行与需求的匹配度打分。
- 需求:{requirement}
- 工序聚类:
- {clusters_text}
- 直接输出 JSON:
- {{"scored_clusters": [{{"cluster_id": "A", "cluster_name": "...", "score": 0.85, "explanation": "评分理由"}}]}}"""
- messages = [{"role": "user", "content": prompt}]
- def _validate_scored_output(parsed):
- scored = parsed.get("scored_clusters", [])
- if not isinstance(scored, list):
- return "'scored_clusters' is not a list"
- if len(scored) == 0:
- return "'scored_clusters' is empty"
- for i, item in enumerate(scored):
- if "score" not in item:
- return f"scored_clusters[{i}] missing 'score'"
- if not isinstance(item["score"], (int, float)):
- return f"scored_clusters[{i}].score must be a number"
- if "explanation" not in item or not (item.get("explanation") or "").strip():
- return f"scored_clusters[{i}] missing or empty 'explanation'"
- return None
- scored_data, total_cost = await call_llm_with_retry(
- llm_call=llm_call,
- messages=messages,
- model=model,
- temperature=0.1,
- max_tokens=4000,
- max_retries=3,
- validate_fn=_validate_scored_output,
- task_name="P2.1.2_ScoreProcesses",
- )
- if scored_data is None:
- scored_data = {"scored_clusters": []}
- # 把 score 和 explanation 合并回原始 clusters
- scored_map = {}
- for sc in scored_data.get("scored_clusters", []):
- cid = sc.get("cluster_id")
- if cid:
- scored_map[cid] = sc
- merged_clusters = []
- for cl in clusters:
- cid = cl.get("cluster_id")
- if cid in scored_map:
- cl["score"] = scored_map[cid].get("score", 0)
- cl["explanation"] = scored_map[cid].get("explanation", "")
- merged_clusters.append(cl)
- output_data = {
- "requirement": requirement,
- "clusters": merged_clusters,
- }
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- return {
- "scored": len(scored_map),
- "total_cost": total_cost,
- }
|