""" 逐 case 提取 workflow (v5版本) 从 case.json 读取,按 index 遍历每个 case, 调用 LLM 提取 workflow,按 index 原位回填到 case.json v5 架构特性: - 使用结构化 inputs/outputs(role, modality, artifact_type 等10个维度) - action 对象化:{main_action, mechanism}(替代旧的 method 字符串) - Stage 1 输出 apply_to_draft(自然语言),为 Stage 2 内容树映射做准备 - strategy 顶层字段(method, inputs, outputs, tools, stage)由脚本自动推导 """ import asyncio import json from pathlib import Path from typing import Any, Dict, Optional, List from examples.process_pipeline.script.llm_helper import call_llm_with_retry # v5 词库文件路径 SCRIPT_DIR = Path(__file__).resolve().parent METHOD_VOCAB_PATH = SCRIPT_DIR / "resource" / "method_vocab_v5.json" # 默认词库(如果文件不存在时使用) DEFAULT_METHOD_VOCAB = { "流程角色": [ "生成指令", "编辑指令", "约束条件", "参考素材", "控制信号", "区域控制", "参数配置", "模型资源", "源素材", "中间产物", "成品", "模板", "评估结果" ], "模态": ["文本", "图片", "视频", "音频", "特征点", "参数", "模型", "向量", "表格"], "主动作": [ "生成", "编辑", "提取", "改写", "合成", "修复", "增强", "训练", "评估", "剪辑", "模板化", "排版", "转写", "配音", "匹配", "扩展", "导出" ], "动作方式": [ "直接生成", "一致性保持", "结构约束", "质量收束", "局部重绘", "扩图", "换背景", "提示词反推", "模板化", "多图融合", "清晰化", "风格迁移", "常规编辑", "变体生成", "动画化", "镜头延展", "换主体", "换装", "擦除", "调色", "前后景融合", "图文合成", "音画合成", "分层叠加", "特征提取", "蒙版提取", "关键帧提取", "字幕提取", "风格提取", "片段拼接", "节奏压缩", "转场编排", "字幕对齐", "音画同步", "降噪", "补帧", "超分", "稳定化", "质感增强", "结构抽象", "变量抽象", "版式套用", "格式转换", "压缩导出" ], } def load_method_vocab() -> Dict[str, list]: """从 JSON 文件加载结构化词库(v5)""" if METHOD_VOCAB_PATH.exists(): try: with open(METHOD_VOCAB_PATH, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Warning: Failed to load method_vocab.json: {e}, using default") return DEFAULT_METHOD_VOCAB def load_prompt_template(prompt_name: str) -> str: base_dir = Path(__file__).parent.parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" with open(prompt_path, "r", encoding="utf-8") as f: content = f.read() if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] content = content.replace("$system$", "").replace("$user$", "") return content.strip() def render_method_vocab_block(vocab: Dict[str, list]) -> str: """渲染结构化接口词库说明(v5)""" lines = [ "\n# 结构化接口词库(v5,必须遵守)", "只输出结构化 inputs / outputs / action。", "- `role/流程角色` 只写接口职责,不写具体内容 what。", "- `modality/模态` 只写媒介或数据形态;统一用 `图片`,不要写 `图像`;统一用 `文本`,不要写 `文字`。", "- `artifact_type/工件类型` 写该模态下的具体工件,如 `正向提示词`、`蒙版`。", "- `action.main_action` 写主动作;`action.mechanism` 写动作内部机制。", "- 只有词库确实不够时才新增术语;新增术语也必须抽象、短、可复用。", "", "当前词库:", ] for key, values in vocab.items(): lines.append(f"- {key}:{'、'.join(values)}") return "\n".join(lines) import re def _infer_stage_from_action(action_obj: dict) -> str: """从 action 对象推断 stage(v5版本)""" main_action = action_obj.get("main_action", "") mechanism = action_obj.get("mechanism", "") # 根据主动作和动作方式推断阶段 if main_action in ["提取", "改写", "模板化", "训练", "评估"]: return "preprocess" elif main_action in ["编辑", "修复", "增强", "剪辑", "排版"]: return "refine" elif mechanism in ["局部重绘", "扩图", "换背景", "换主体", "换装", "擦除", "调色", "前后景融合", "降噪", "补帧", "超分", "稳定化", "质感增强"]: return "refine" else: return "generate" def derive_strategy_rollup(strategy: dict) -> None: """ 从 steps 自动推导 strategy 的顶层字段(v5版本): method, inputs, outputs, tools, stage v5 变化: - method 从 action.main_action 提取(不再从旧的 method 字符串解析) - stage 从 action 对象推断 """ steps = [s for s in (strategy.get("steps") or []) if isinstance(s, dict)] if not steps: return steps.sort(key=lambda s: s.get("order") if isinstance(s.get("order"), int) else 9999) # method = 所有步骤的 main_action 用 "-" 连接 actions = [] for s in steps: action_obj = s.get("action") if isinstance(action_obj, dict): main_action = action_obj.get("main_action", "") if main_action: actions.append(main_action) if actions: strategy["method"] = "-".join(actions) # inputs = 第一步的 inputs first_inputs = steps[0].get("inputs") strategy["inputs"] = first_inputs if isinstance(first_inputs, list) else [] # outputs = 最后一步的 outputs last_outputs = steps[-1].get("outputs") strategy["outputs"] = last_outputs if isinstance(last_outputs, list) else [] # tools = 所有步骤的 tools 去重合并 tools = [] for step in steps: for tool in step.get("tools") or []: if isinstance(tool, str) and tool and tool not in tools: tools.append(tool) strategy["tools"] = tools # stage = 从 action 对象推断 stages = [] for step in steps: action_obj = step.get("action") if isinstance(action_obj, dict): stage = _infer_stage_from_action(action_obj) if stage not in stages: stages.append(stage) strategy["stage"] = stages or ["generate"] async def extract_workflow_from_case( case_item: Dict[str, Any], llm_call: Any, model: str = "anthropic/claude-sonnet-4-5" ) -> tuple[Optional[Dict[str, Any]], float]: """ 从单个 case item 提取 workflow (v5版本) v5 特性: - 结构化 inputs/outputs(role, modality, artifact_type 等) - action 对象化:{main_action, mechanism}(替代旧的 method 字符串) - 输出 apply_to_draft(自然语言),为 Stage 2 内容树映射做准备 - strategy 顶层字段由 derive_strategy_rollup 自动推导 """ images = case_item.get("images", []) case_copy = dict(case_item) case_copy.pop("images", None) case_copy.pop("_raw", None) case_copy.pop("workflow", None) case_copy.pop("capabilities", None) if not case_copy and not images: return None, 0.0 title = case_item.get("title", "")[:20] or "untitled" context = json.dumps(case_copy, ensure_ascii=False, indent=2) try: prompt_template = load_prompt_template("extract_workflow") # 添加 v5 词库说明 method_vocab = load_method_vocab() vocab_block = render_method_vocab_block(method_vocab) if "%context%" in prompt_template: prompt = prompt_template.replace("%context%", context) else: prompt = prompt_template + f"\n\n## 帖子内容\n{context}" # 如果 prompt 中有 {interface_vocab} 占位符,替换为词库说明 if "{interface_vocab}" in prompt: prompt = prompt.replace("{interface_vocab}", vocab_block) elif vocab_block not in prompt: # 如果 prompt 中没有词库说明,添加到末尾 prompt = prompt + "\n" + vocab_block except Exception as e: print(f"Warning: Failed to load prompt template: {e}, using fallback") method_vocab = load_method_vocab() vocab_block = render_method_vocab_block(method_vocab) prompt = f"""将以下帖子内容总结为AI图片生成的工序,以JSON格式输出。 # 工序提取规则(v5) - 步骤粒度是"做了什么",而非"怎么做" - 以"触发生成 / 处理的动作"为步骤边界 - 若本质上只有一步,也输出一步,不要返回 strategy=null - 本阶段严禁生成 apply_to,只生成 apply_to_draft # 输出格式(v5) {{ "skip": false, "skip_reason": "", "strategy": {{ "steps": [ {{ "order": 1, "action": {{"main_action": "生成", "mechanism": "直接生成"}}, "body": "string | null", "inputs": [ {{ "role": "生成指令", "modality": "文本", "artifact_type": "正向提示词", "control_target": ["主体", "场景"], "target_scope": ["整图"], "constraint_strength": "硬约束", "source": "原帖文本", "lifecycle": "原始输入", "description": "用于触发图片生成的完整提示词" }} ], "outputs": [...], "tools": [] }} ], "effects": ["实现 XX 效果"], "criterion": null, "apply_to_draft": {{"实质": ["相关 what"], "形式": ["相关呈现方式"]}}, "unstructured_what": [] }} }} {vocab_block} ## 帖子内容 {context} 请严格按照上述格式输出JSON,不要包含其他内容。""" if images: image_urls = [img for img in images[:9] if isinstance(img, str) and img.startswith("http")] if image_urls: content_array = [{"type": "text", "text": prompt}] for url in image_urls: content_array.append({"type": "image_url", "image_url": {"url": url}}) messages = [{"role": "user", "content": content_array}] else: messages = [{"role": "user", "content": prompt}] else: messages = [{"role": "user", "content": prompt}] result_data, cost = await call_llm_with_retry( llm_call=llm_call, messages=messages, model=model, temperature=0.1, max_tokens=8000, # 从2000增加到4000,处理更长的输出 max_retries=3, # 从3增加到5,增加重试机会 schema_name="extract_workflow", task_name=f"Workflow_{title}", ) # Stage 1 格式:{"skip": bool, "skip_reason": str, "strategy": {...}} # 如果 skip=true 或 strategy=null,返回 None if not result_data: return None, cost if result_data.get("skip"): return None, cost workflow_data = result_data.get("strategy") # 从 steps 自动推导顶层字段(v5版本) if workflow_data and isinstance(workflow_data, dict): derive_strategy_rollup(workflow_data) return workflow_data, cost async def extract_workflow( case_file: Path, llm_call: Any, model: str = "anthropic/claude-sonnet-4-5", max_concurrent: int = 3, case_indices: Optional[List[int]] = None ) -> Dict[str, Any]: """ 按 index 遍历 case.json,提取 workflow Args: case_file: case.json 文件路径 llm_call: LLM 调用函数 model: 使用的模型 max_concurrent: 最大并发数 case_indices: 可选,指定要处理的 case index 列表。如果为 None,处理所有 case """ with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) cases = case_data.get("cases", []) # 如果指定了 case_indices,只处理这些 case if case_indices is not None: cases_to_process = [c for c in cases if c.get("index") in case_indices] print(f"Extracting workflow from {len(cases_to_process)} cases (filtered by indices: {case_indices})...") else: cases_to_process = cases print(f"Extracting workflow from {len(cases)} cases...") semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(case_item): async with semaphore: index = case_item.get("index", 0) raw = case_item.get("_raw", {}) case_id = raw.get("case_id", "unknown") title = case_item.get("title", "") print(f" -> [{index}] [{case_id}] extracting workflow: {title[:60]}") workflow, cost = await extract_workflow_from_case(case_item, llm_call, model) status = "ok" if workflow else "null" print(f" <- [{index}] [{case_id}] workflow {status}") result = dict(case_item) result["workflow"] = workflow return result, cost tasks = [process_with_semaphore(case) for case in cases_to_process] results_with_costs = await asyncio.gather(*tasks) results = [r[0] for r in results_with_costs] costs = [r[1] for r in results_with_costs] total_cost = sum(costs) success_count = sum(1 for r in results if r.get("workflow")) failed_count = len(results) - success_count # 如果是部分更新,需要合并回原始 cases 列表 if case_indices is not None: # 创建一个 index -> result 的映射 result_map = {r.get("index"): r for r in results} # 更新原始 cases 列表中对应的项 for i, case in enumerate(cases): if case.get("index") in result_map: cases[i] = result_map[case.get("index")] results = cases results.sort(key=lambda x: x.get("index", 0)) case_data["cases"] = results case_file.parent.mkdir(parents=True, exist_ok=True) with open(case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) return { "total": len(results), "success": success_count, "failed": failed_count, "total_cost": total_cost, "output_file": str(case_file), } if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python extract_workflow.py ") sys.exit(1) output_dir = Path(sys.argv[1]) case_file = output_dir / "case.json" if not case_file.exists(): print(f"Error: {case_file} not found") sys.exit(1) print("Please use this module through run_pipeline.py")