| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python3
- """检查 workflow_api.json 需要哪些外部输入
- 分析所有 LoadImage / LoadVideo 等输入节点,列出需要提供的文件。
- 检查所有节点连接是否完整,发现缺失的输入。
- 用法:
- python check_workflow.py workflow_api.json
- python check_workflow.py workflow_api.json --images ref.png pose.jpg
- """
- import argparse
- import json
- from pathlib import Path
- # 需要外部文件输入的节点类型,以及对应的文件参数名
- FILE_INPUT_NODES = {
- "LoadImage": ["image"],
- "LoadVideo": ["video"],
- "LoadVideoPath": ["video"],
- "LoadImageMask": ["image"],
- "LoadImageOutput": ["image"], # 从 output 目录加载,特殊处理
- "VHS_LoadVideo": ["video"],
- "VHS_LoadImages": ["directory"],
- }
- # 纯 UI / 注释节点,无实际输入
- SKIP_NODES = {"Note", "MarkdownNote", "PrimitiveNode"}
- def analyze(workflow: dict) -> dict:
- node_ids = set(workflow.keys())
- # 收集所有被其他节点引用的 [node_id, slot] 连接
- referenced = set()
- for node_id, node in workflow.items():
- for key, val in node.get("inputs", {}).items():
- if isinstance(val, list) and len(val) == 2 and isinstance(val[0], str):
- referenced.add(val[0])
- issues = []
- file_inputs = [] # 需要上传的文件
- widget_params = [] # widget_* 占位参数(转换不准确的)
- for node_id, node in workflow.items():
- class_type = node.get("class_type", "")
- if class_type in SKIP_NODES:
- continue
- inputs = node.get("inputs", {})
- # 检查连接引用是否存在
- for key, val in inputs.items():
- if isinstance(val, list) and len(val) == 2 and isinstance(val[0], str):
- ref_node = val[0]
- if ref_node not in node_ids:
- issues.append(f"节点 [{node_id}] {class_type}: 输入 '{key}' 引用了不存在的节点 [{ref_node}]")
- # 检查文件输入节点
- if class_type in FILE_INPUT_NODES:
- param_names = FILE_INPUT_NODES[class_type]
- for param in param_names:
- # 可能是真实参数名,也可能被转成了 widget_*
- value = inputs.get(param)
- if value is None:
- # 找 widget_* 里的值
- widget_vals = {k: v for k, v in inputs.items() if k.startswith("widget_")}
- value = next(iter(widget_vals.values()), None) if widget_vals else None
- is_output_node = class_type == "LoadImageOutput"
- file_inputs.append({
- "node_id": node_id,
- "class_type": class_type,
- "param": param,
- "current_value": value,
- "is_output": is_output_node,
- })
- # 标记 widget_* 占位参数(说明转换不准确)
- widget_keys = [k for k in inputs if k.startswith("widget_")]
- if widget_keys and class_type not in FILE_INPUT_NODES:
- widget_params.append({
- "node_id": node_id,
- "class_type": class_type,
- "params": {k: inputs[k] for k in widget_keys},
- })
- return {
- "file_inputs": file_inputs,
- "widget_params": widget_params,
- "issues": issues,
- }
- def check_files_exist(file_inputs: list, provided: list[Path]) -> list:
- provided_names = {p.name for p in provided if p.exists()}
- missing = []
- for fi in file_inputs:
- if fi["is_output"]:
- continue # LoadImageOutput 从服务器 output 目录读,不需要上传
- val = fi["current_value"]
- if val and isinstance(val, str):
- # 去掉 [output] 等后缀
- filename = val.split(" ")[0]
- if filename not in provided_names:
- missing.append({**fi, "filename": filename})
- return missing
- def main():
- parser = argparse.ArgumentParser(description="检查 workflow_api.json 所需输入")
- parser.add_argument("workflow", help="workflow_api.json 路径")
- parser.add_argument("--input-dir", default="input", metavar="DIR", help="输入文件目录,默认 input/")
- args = parser.parse_args()
- workflow_path = Path(args.workflow)
- if not workflow_path.exists():
- print(f"ERROR: 文件不存在: {workflow_path}")
- return 1
- with open(workflow_path, "r", encoding="utf-8") as f:
- workflow = json.load(f)
- input_dir = Path(args.input_dir)
- all_input_files = list(input_dir.rglob("*")) if input_dir.exists() else []
- print(f"=== 检查 {workflow_path.name} ===\n")
- print(f"节点总数: {len(workflow)}")
- print(f"input 目录: {input_dir} ({'存在' if input_dir.exists() else '不存在'})")
- if all_input_files:
- print(f" 已有文件:")
- for f in sorted(all_input_files):
- if f.is_file():
- print(f" - {f.relative_to(input_dir)}")
- result = analyze(workflow)
- # ── 文件输入 ──
- print(f"\n── 文件输入节点 ({len(result['file_inputs'])} 个) ──")
- if result["file_inputs"]:
- for fi in result["file_inputs"]:
- tag = "[output目录]" if fi["is_output"] else "[需上传]"
- print(f" [{fi['node_id']}] {fi['class_type']}.{fi['param']}")
- print(f" 当前值: {fi['current_value']} {tag}")
- else:
- print(" 无")
- # ── widget_* 警告 ──
- if result["widget_params"]:
- print(f"\n── ⚠️ widget_* 占位参数(参数名不准确,建议用 ComfyUI 导出 API 格式)──")
- for wp in result["widget_params"]:
- print(f" [{wp['node_id']}] {wp['class_type']}")
- for k, v in wp["params"].items():
- print(f" {k}: {v}")
- # ── 连接问题 ──
- if result["issues"]:
- print(f"\n── ❌ 连接问题 ({len(result['issues'])} 个) ──")
- for issue in result["issues"]:
- print(f" {issue}")
- # ── 文件缺失检查 ──
- missing = check_files_exist(result["file_inputs"], all_input_files)
- print(f"\n── 文件准备状态 ──")
- needs_upload = [fi for fi in result["file_inputs"] if not fi["is_output"]]
- if not needs_upload:
- print(" 无需上传文件")
- else:
- for fi in needs_upload:
- filename = (fi["current_value"] or "").split(" ")[0]
- found = any(f.is_file() and f.name == filename for f in all_input_files)
- status = "✓ 已提供" if found else "✗ 缺失"
- print(f" {status} {filename} (节点 [{fi['node_id']}] {fi['class_type']})")
- # ── 最终结论 ──
- print()
- has_error = bool(result["issues"]) or bool(missing)
- has_warn = bool(result["widget_params"])
- if has_error:
- print("❌ 检查未通过,请将缺失文件放入 input/ 目录后再提交")
- return 1
- elif has_warn:
- print("⚠️ 存在 widget_* 占位参数,建议用 ComfyUI 导出正确的 API 格式,否则可能运行出错")
- return 0
- else:
- print("✓ 检查通过,可以提交")
- return 0
- if __name__ == "__main__":
- exit(main())
|