extract_capabilities_workflow.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. Phase 2.2.1: 能力初步聚类(workflow 形式)
  3. 直接读取 case_detailed.json + source.json,调用 LLM 提取原子能力,
  4. 输出到 capabilities_temp.json
  5. """
  6. import asyncio
  7. import json
  8. from pathlib import Path
  9. from typing import Any, Dict
  10. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  11. from examples.process_pipeline.script.validate_schema import validate_capabilities_temp
  12. def load_prompt_template(prompt_name: str) -> str:
  13. base_dir = Path(__file__).parent.parent
  14. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  15. with open(prompt_path, "r", encoding="utf-8") as f:
  16. content = f.read()
  17. if content.startswith("---"):
  18. parts = content.split("---", 2)
  19. if len(parts) >= 3:
  20. content = parts[2]
  21. content = content.replace("$system$", "").replace("$user$", "")
  22. return content.strip()
  23. async def extract_capabilities_workflow(
  24. detailed_file: Path,
  25. source_file: Path,
  26. output_file: Path,
  27. requirement: str,
  28. llm_call,
  29. model: str = "anthropic/claude-sonnet-4-6",
  30. ) -> Dict[str, Any]:
  31. """
  32. 读取 case_detailed.json + source.json,调用 LLM 提取原子能力
  33. """
  34. detailed_data = {}
  35. source_data = {}
  36. if detailed_file.exists():
  37. with open(detailed_file, "r", encoding="utf-8") as f:
  38. detailed_data = json.load(f)
  39. if source_file.exists():
  40. with open(source_file, "r", encoding="utf-8") as f:
  41. source_data = json.load(f)
  42. # 构造输入数据摘要
  43. cases_summary = []
  44. for case in detailed_data.get("cases", []):
  45. case_id = case.get("case_id", f"{case.get('platform', '')}_{case.get('channel_content_id', '')}")
  46. workflow = case.get("workflow")
  47. steps_text = []
  48. if workflow and "工序步骤" in workflow:
  49. steps = workflow["工序步骤"]
  50. steps_text = [s.get("步骤描述", "") for s in steps]
  51. # 从 source.json 获取原始帖子信息(用于辅助理解)
  52. body_text = ""
  53. for src in source_data.get("sources", []):
  54. src_id = f"{src.get('platform', '')}_{src.get('channel_content_id', '')}"
  55. if src_id == case_id:
  56. body_text = src.get("post", {}).get("body_text", "")[:500]
  57. break
  58. cases_summary.append({
  59. "case_id": case_id,
  60. "title": case.get("title", ""),
  61. "workflow_steps": steps_text,
  62. "body_preview": body_text,
  63. })
  64. cases_json = json.dumps(cases_summary, ensure_ascii=False, indent=2)
  65. print(f"Loaded {len(cases_summary)} cases for capability extraction", flush=True)
  66. # 加载 prompt
  67. try:
  68. prompt_template = load_prompt_template("extract_capabilities")
  69. prompt = prompt_template + f"\n\n需求:{requirement}\n\n案例数据:\n```json\n{cases_json}\n```"
  70. except Exception:
  71. prompt = f"""你是一个 AI 创作工作流分析师。请从以下案例中跨案例统一提取可复用的核心能力。
  72. 需求:{requirement}
  73. 案例数据:
  74. {cases_json}
  75. 输出 JSON 格式:
  76. {{"abilities": [{{"ability_id": "AB-01", "ability_name": "能力名称(4-10字)", "ability_description": "一句话描述该能力的核心动作和目的", "关联案例": ["bili_BV1xxx", "xhs_694e17e9000000001e006669"]}}]}}"""
  77. messages = [{"role": "user", "content": prompt}]
  78. capabilities_data, total_cost = await call_llm_with_retry(
  79. llm_call=llm_call,
  80. messages=messages,
  81. model=model,
  82. temperature=0.1,
  83. max_tokens=4000,
  84. max_retries=3,
  85. validate_fn=validate_capabilities_temp,
  86. task_name="P2.2.1_ExtractCapabilities",
  87. )
  88. if capabilities_data is None:
  89. capabilities_data = {"abilities": []}
  90. output_file.parent.mkdir(parents=True, exist_ok=True)
  91. with open(output_file, "w", encoding="utf-8") as f:
  92. json.dump(capabilities_data, f, ensure_ascii=False, indent=2)
  93. return {
  94. "capabilities": len(capabilities_data.get("abilities", [])),
  95. "total_cost": total_cost,
  96. }