cluster_processes.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """
  2. Phase 2.1.1: 工序聚类(workflow 形式)
  3. 直接读取 source.json + case_detailed.json,调用 LLM 生成 blueprint,
  4. 输出到 blueprint_temp.json
  5. """
  6. import asyncio
  7. import json
  8. from pathlib import Path
  9. from typing import Any, Dict
  10. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  11. from examples.process_pipeline.script.validate_schema import validate_blueprint_temp
  12. def load_prompt_template(prompt_name: str) -> str:
  13. base_dir = Path(__file__).parent.parent
  14. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  15. with open(prompt_path, "r", encoding="utf-8") as f:
  16. content = f.read()
  17. if content.startswith("---"):
  18. parts = content.split("---", 2)
  19. if len(parts) >= 3:
  20. content = parts[2]
  21. content = content.replace("$system$", "").replace("$user$", "")
  22. return content.strip()
  23. async def cluster_processes(
  24. source_file: Path,
  25. detailed_file: Path,
  26. output_file: Path,
  27. requirement: str,
  28. llm_call,
  29. model: str = "anthropic/claude-sonnet-4-6",
  30. ) -> Dict[str, Any]:
  31. """
  32. 读取 source.json + case_detailed.json,调用 LLM 生成 blueprint
  33. """
  34. # 读取输入文件
  35. source_data = {}
  36. detailed_data = {}
  37. if source_file.exists():
  38. with open(source_file, "r", encoding="utf-8") as f:
  39. source_data = json.load(f)
  40. if detailed_file.exists():
  41. with open(detailed_file, "r", encoding="utf-8") as f:
  42. detailed_data = json.load(f)
  43. # 构造输入数据摘要
  44. cases_summary = []
  45. for case in detailed_data.get("cases", []):
  46. case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}")
  47. workflow = case.get("workflow")
  48. steps_text = ""
  49. if workflow and "工序步骤" in workflow:
  50. steps = workflow["工序步骤"]
  51. steps_text = " → ".join([s.get("步骤描述", "") for s in steps])
  52. # 从 source.json 获取原始帖子信息
  53. body_text = ""
  54. for src in source_data.get("sources", []):
  55. src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}"
  56. if src_id == case_id:
  57. body_text = src.get("post", {}).get("body_text", "")[:500]
  58. break
  59. cases_summary.append({
  60. "case_id": case_id,
  61. "title": case.get("title", ""),
  62. "source_url": case.get("source_url", ""),
  63. "steps": steps_text,
  64. "body_preview": body_text,
  65. })
  66. cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2)
  67. print(f"Loaded {len(cases_summary)} cases for process clustering")
  68. # 加载 prompt
  69. try:
  70. prompt_template = load_prompt_template("process_cluster")
  71. prompt = prompt_template.replace("%requirement%", requirement)
  72. prompt = prompt.replace("%cases_data%", cases_json)
  73. except Exception:
  74. prompt = f"""你是一个 AI 创作工作流分析师。请按照核心创作手段对以下案例进行聚类。
  75. 需求:{requirement}
  76. 案例数据:
  77. {cases_json}
  78. 输出 JSON 格式:
  79. {{"clusters": [{{"cluster_id": "A", "cluster_name": "...", "工序步骤": [{{"步骤序号": 1, "步骤描述": "..."}}], "关联案例": ["bili_BV1xxx"]}}]}}"""
  80. messages = [{"role": "user", "content": prompt}]
  81. cluster_data, total_cost = await call_llm_with_retry(
  82. llm_call=llm_call,
  83. messages=messages,
  84. model=model,
  85. temperature=0.2,
  86. max_tokens=4000,
  87. max_retries=3,
  88. validate_fn=validate_blueprint_temp,
  89. task_name="P2.1.1_ClusterProcesses",
  90. )
  91. if cluster_data is None:
  92. cluster_data = {"clusters": []}
  93. output_file.parent.mkdir(parents=True, exist_ok=True)
  94. with open(output_file, "w", encoding="utf-8") as f:
  95. json.dump(cluster_data, f, ensure_ascii=False, indent=2)
  96. return {
  97. "clusters": len(cluster_data.get("clusters", [])),
  98. "total_cost": total_cost,
  99. }