#!/usr/bin/env python3 """检查 workflow_api.json 需要哪些外部输入 分析所有 LoadImage / LoadVideo 等输入节点,列出需要提供的文件。 检查所有节点连接是否完整,发现缺失的输入。 用法: python check_workflow.py workflow_api.json python check_workflow.py workflow_api.json --images ref.png pose.jpg """ import argparse import json from pathlib import Path # 需要外部文件输入的节点类型,以及对应的文件参数名 FILE_INPUT_NODES = { "LoadImage": ["image"], "LoadVideo": ["video"], "LoadVideoPath": ["video"], "LoadImageMask": ["image"], "LoadImageOutput": ["image"], # 从 output 目录加载,特殊处理 "VHS_LoadVideo": ["video"], "VHS_LoadImages": ["directory"], } # 纯 UI / 注释节点,无实际输入 SKIP_NODES = {"Note", "MarkdownNote", "PrimitiveNode"} def analyze(workflow: dict) -> dict: node_ids = set(workflow.keys()) # 收集所有被其他节点引用的 [node_id, slot] 连接 referenced = set() for node_id, node in workflow.items(): for key, val in node.get("inputs", {}).items(): if isinstance(val, list) and len(val) == 2 and isinstance(val[0], str): referenced.add(val[0]) issues = [] file_inputs = [] # 需要上传的文件 widget_params = [] # widget_* 占位参数(转换不准确的) for node_id, node in workflow.items(): class_type = node.get("class_type", "") if class_type in SKIP_NODES: continue inputs = node.get("inputs", {}) # 检查连接引用是否存在 for key, val in inputs.items(): if isinstance(val, list) and len(val) == 2 and isinstance(val[0], str): ref_node = val[0] if ref_node not in node_ids: issues.append(f"节点 [{node_id}] {class_type}: 输入 '{key}' 引用了不存在的节点 [{ref_node}]") # 检查文件输入节点 if class_type in FILE_INPUT_NODES: param_names = FILE_INPUT_NODES[class_type] for param in param_names: # 可能是真实参数名,也可能被转成了 widget_* value = inputs.get(param) if value is None: # 找 widget_* 里的值 widget_vals = {k: v for k, v in inputs.items() if k.startswith("widget_")} value = next(iter(widget_vals.values()), None) if widget_vals else None is_output_node = class_type == "LoadImageOutput" file_inputs.append({ "node_id": node_id, "class_type": class_type, "param": param, "current_value": value, "is_output": is_output_node, }) # 标记 widget_* 占位参数(说明转换不准确) widget_keys = [k for k in inputs if k.startswith("widget_")] if widget_keys and class_type not in FILE_INPUT_NODES: widget_params.append({ "node_id": node_id, "class_type": class_type, "params": {k: inputs[k] for k in widget_keys}, }) return { "file_inputs": file_inputs, "widget_params": widget_params, "issues": issues, } def check_files_exist(file_inputs: list, provided: list[Path]) -> list: provided_names = {p.name for p in provided if p.exists()} missing = [] for fi in file_inputs: if fi["is_output"]: continue # LoadImageOutput 从服务器 output 目录读,不需要上传 val = fi["current_value"] if val and isinstance(val, str): # 去掉 [output] 等后缀 filename = val.split(" ")[0] if filename not in provided_names: missing.append({**fi, "filename": filename}) return missing def main(): parser = argparse.ArgumentParser(description="检查 workflow_api.json 所需输入") parser.add_argument("workflow", help="workflow_api.json 路径") parser.add_argument("--input-dir", default="input", metavar="DIR", help="输入文件目录,默认 input/") args = parser.parse_args() workflow_path = Path(args.workflow) if not workflow_path.exists(): print(f"ERROR: 文件不存在: {workflow_path}") return 1 with open(workflow_path, "r", encoding="utf-8") as f: workflow = json.load(f) input_dir = Path(args.input_dir) all_input_files = list(input_dir.rglob("*")) if input_dir.exists() else [] print(f"=== 检查 {workflow_path.name} ===\n") print(f"节点总数: {len(workflow)}") print(f"input 目录: {input_dir} ({'存在' if input_dir.exists() else '不存在'})") if all_input_files: print(f" 已有文件:") for f in sorted(all_input_files): if f.is_file(): print(f" - {f.relative_to(input_dir)}") result = analyze(workflow) # ── 文件输入 ── print(f"\n── 文件输入节点 ({len(result['file_inputs'])} 个) ──") if result["file_inputs"]: for fi in result["file_inputs"]: tag = "[output目录]" if fi["is_output"] else "[需上传]" print(f" [{fi['node_id']}] {fi['class_type']}.{fi['param']}") print(f" 当前值: {fi['current_value']} {tag}") else: print(" 无") # ── widget_* 警告 ── if result["widget_params"]: print(f"\n── ⚠️ widget_* 占位参数(参数名不准确,建议用 ComfyUI 导出 API 格式)──") for wp in result["widget_params"]: print(f" [{wp['node_id']}] {wp['class_type']}") for k, v in wp["params"].items(): print(f" {k}: {v}") # ── 连接问题 ── if result["issues"]: print(f"\n── ❌ 连接问题 ({len(result['issues'])} 个) ──") for issue in result["issues"]: print(f" {issue}") # ── 文件缺失检查 ── missing = check_files_exist(result["file_inputs"], all_input_files) print(f"\n── 文件准备状态 ──") needs_upload = [fi for fi in result["file_inputs"] if not fi["is_output"]] if not needs_upload: print(" 无需上传文件") else: for fi in needs_upload: filename = (fi["current_value"] or "").split(" ")[0] found = any(f.is_file() and f.name == filename for f in all_input_files) status = "✓ 已提供" if found else "✗ 缺失" print(f" {status} {filename} (节点 [{fi['node_id']}] {fi['class_type']})") # ── 最终结论 ── print() has_error = bool(result["issues"]) or bool(missing) has_warn = bool(result["widget_params"]) if has_error: print("❌ 检查未通过,请将缺失文件放入 input/ 目录后再提交") return 1 elif has_warn: print("⚠️ 存在 widget_* 占位参数,建议用 ComfyUI 导出正确的 API 格式,否则可能运行出错") return 0 else: print("✓ 检查通过,可以提交") return 0 if __name__ == "__main__": exit(main())