#!/usr/bin/env python3 """RunComfy Server API + ComfyUI Backend API 流程:启动机器 → 上传 input/ 目录文件 → 提交 workflow → WebSocket 监听 → 下载结果 → 关机 input/ 目录结构: input/ ├── images/ → 上传到 ComfyUI input/(LoadImage 节点用) ├── loras/ → 上传到 ComfyUI models/loras/ ├── checkpoints/ → 上传到 ComfyUI models/checkpoints/ ├── vae/ → 上传到 ComfyUI models/vae/ └── (其他文件) → 上传到 ComfyUI input/ 用法: python run_workflow.py workflow_api.json python run_workflow.py workflow_api.json --input-dir ./input python run_workflow.py workflow_api.json --input-dir ./input --server-type large """ import argparse import json import os import sys import time import urllib.parse import uuid from pathlib import Path import requests import websocket from dotenv import load_dotenv from check_workflow import analyze, check_files_exist load_dotenv(Path(__file__).parent.parent.parent / ".env") BASE_URL = "https://beta-api.runcomfy.net/prod/api" USER_ID = os.getenv("RUNCOMFY_USER_ID") API_TOKEN = os.getenv("API_TOKEN") HEADERS = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json", } DEFAULT_VERSION_ID = "90f77137-ba75-400d-870f-204c614ae8a3" # RunComfy/ComfyUI-NodesLoaded # input/ 子目录 → ComfyUI 上传类型和 subfolder 映射 # type="input" 对应 ComfyUI 的 input 目录 # type="model" 暂无官方支持,lora 等模型走 subfolder 区分 SUBDIR_UPLOAD_MAP = { "images": {"type": "input", "subfolder": ""}, "loras": {"type": "input", "subfolder": "loras"}, "checkpoints": {"type": "input", "subfolder": "checkpoints"}, "vae": {"type": "input", "subfolder": "vae"}, "controlnet": {"type": "input", "subfolder": "controlnet"}, "upscale": {"type": "input", "subfolder": "upscale_models"}, } # ── 机器管理 ────────────────────────────────────────────── def launch_machine(version_id: str, server_type: str = "medium", duration: int = 3600) -> str: payload = { "workflow_version_id": version_id, "server_type": server_type, "estimated_duration": duration, } resp = requests.post(f"{BASE_URL}/users/{USER_ID}/servers", headers=HEADERS, json=payload) if not resp.ok: print(f" HTTP {resp.status_code}: {resp.text}") resp.raise_for_status() print(f" 响应: {resp.json()}") server_id = resp.json()["server_id"] print(f"机器已创建: {server_id} (type={server_type})") return server_id def wait_for_ready(server_id: str, timeout: int = 300) -> str: print("等待机器就绪...") start = time.time() while time.time() - start < timeout: resp = requests.get(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS) resp.raise_for_status() data = resp.json() status = data.get("current_status", "") print(f" 状态: {status}") if status == "Ready": url = data["main_service_url"].rstrip("/") print(f" 就绪: {url}") return url if status in ("Error", "Failed"): raise Exception(f"机器启动失败: {status}") time.sleep(5) raise TimeoutError(f"等待超时 ({timeout}s)") def stop_machine(server_id: str): resp = requests.delete(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS) resp.raise_for_status() print(f"机器已关闭: {server_id}") # ── 文件上传 ────────────────────────────────────────────── def upload_file(comfy_url: str, file_path: Path, file_type: str = "input", subfolder: str = "") -> str: """上传文件到 ComfyUI,返回服务器上的实际文件名""" with open(file_path, "rb") as f: files = [("image", (file_path.name, f, "application/octet-stream"))] data = {"overwrite": "true", "type": file_type, "subfolder": subfolder} resp = requests.post(f"{comfy_url}/upload/image", data=data, files=files) resp.raise_for_status() server_name = resp.json()["name"] subfolder_str = f" → {subfolder}/{server_name}" if subfolder else f" → {server_name}" print(f" 上传: {file_path.name}{subfolder_str}") return server_name def upload_input_dir(comfy_url: str, input_dir: Path) -> dict[str, str]: """ 扫描 input_dir,按子目录上传文件,返回 {原文件名: 服务器文件名} 映射 - input/images/ → type=input, subfolder="" - input/loras/ → type=input, subfolder="loras" - input/*.png → type=input, subfolder=""(根目录文件) """ if not input_dir.exists(): print(f" input 目录不存在: {input_dir}") return {} uploaded = {} IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} VIDEO_EXTS = {".mp4", ".avi", ".mov", ".webm"} MODEL_EXTS = {".safetensors", ".ckpt", ".pt", ".pth", ".gguf"} ALL_EXTS = IMAGE_EXTS | VIDEO_EXTS | MODEL_EXTS # 根目录文件 → input for f in input_dir.iterdir(): if f.is_file() and f.suffix.lower() in ALL_EXTS: server_name = upload_file(comfy_url, f, "input", "") uploaded[f.name] = server_name # 子目录文件 for subdir in input_dir.iterdir(): if not subdir.is_dir(): continue mapping = SUBDIR_UPLOAD_MAP.get(subdir.name, {"type": "input", "subfolder": subdir.name}) for f in subdir.iterdir(): if f.is_file() and f.suffix.lower() in ALL_EXTS: server_name = upload_file(comfy_url, f, mapping["type"], mapping["subfolder"]) uploaded[f.name] = server_name return uploaded # ── 提交 workflow ───────────────────────────────────────── def submit_prompt(comfy_url: str, workflow_api: dict, client_id: str) -> str: payload = {"prompt": workflow_api, "client_id": client_id} resp = requests.post(f"{comfy_url}/prompt", json=payload) resp.raise_for_status() data = resp.json() if data.get("node_errors"): print(f" 节点错误: {data['node_errors']}") prompt_id = data["prompt_id"] print(f"任务已提交: {prompt_id}") return prompt_id # ── WebSocket 监听 ──────────────────────────────────────── def wait_for_completion(comfy_url: str, client_id: str, prompt_id: str, timeout: int = 600): scheme = "wss" if comfy_url.startswith("https") else "ws" ws_url = f"{scheme}://{comfy_url.split('://', 1)[-1]}/ws?clientId={client_id}" print("WebSocket 监听中...") ws = websocket.WebSocket() ws.settimeout(timeout) ws.connect(ws_url) try: while True: out = ws.recv() if not out or isinstance(out, bytes): continue msg = json.loads(out) msg_type = msg.get("type", "") data = msg.get("data", {}) if msg_type == "executing": node = data.get("node") if data.get("prompt_id") == prompt_id and node is None: print(" 执行完成") break if node: print(f" 执行节点: {node}") elif msg_type == "progress": value = data.get("value", 0) max_val = data.get("max", 1) print(f" 进度: {value}/{max_val}") elif msg_type == "execution_error": if data.get("prompt_id") == prompt_id: raise Exception(f"执行错误: {data.get('exception_message', 'unknown')}") finally: ws.close() # ── 下载结果 ────────────────────────────────────────────── def download_outputs(comfy_url: str, prompt_id: str, output_dir: Path) -> list[str]: resp = requests.get(f"{comfy_url}/history/{prompt_id}") resp.raise_for_status() data = resp.json().get(prompt_id, {}) outputs = data.get("outputs", {}) output_dir.mkdir(parents=True, exist_ok=True) saved = [] for node_id, node_output in outputs.items(): if "images" in node_output: for image in node_output["images"]: params = {"filename": image["filename"], "subfolder": image.get("subfolder", ""), "type": image.get("temp") or image.get("type", "output")} resp = requests.get(f"{comfy_url}/view?{urllib.parse.urlencode(params)}") resp.raise_for_status() out_path = output_dir / image["filename"] out_path.write_bytes(resp.content) print(f" 图片: {out_path}") saved.append(str(out_path)) if "gifs" in node_output: for video in node_output["gifs"]: params = {"filename": video["filename"], "subfolder": video.get("subfolder", ""), "format": video.get("format", "mp4")} resp = requests.get(f"{comfy_url}/view?{urllib.parse.urlencode(params)}") resp.raise_for_status() out_path = output_dir / video["filename"] out_path.write_bytes(resp.content) print(f" 视频: {out_path}") saved.append(str(out_path)) return saved # ── 主流程 ──────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="RunComfy workflow runner") parser.add_argument("workflow", help="workflow_api.json 路径") parser.add_argument("--input-dir", default="input", metavar="DIR", help="输入文件目录,默认 input/。子目录 images/loras/checkpoints/vae/ 自动上传到对应位置") parser.add_argument("--version-id", default=DEFAULT_VERSION_ID, help="RunComfy workflow version_id") parser.add_argument("--server-type", default="medium", choices=["medium", "large", "extra-large", "2x-large", "2xl-turbo"]) parser.add_argument("--duration", type=int, default=3600, help="预估运行时长(秒),默认3600") parser.add_argument("--keep-alive", action="store_true", help="完成后不自动关机") parser.add_argument("--server-id", metavar="ID", help="复用已有机器,跳过启动步骤") parser.add_argument("--skip-upload", action="store_true", help="跳过文件上传,直接提交 workflow") parser.add_argument("--output-dir", default="output", metavar="DIR", help="结果下载目录,默认 output/") args = parser.parse_args() if not USER_ID or not API_TOKEN: print("ERROR: 请设置 RUNCOMFY_USER_ID 和 API_TOKEN 环境变量") sys.exit(1) print(f"USER_ID : {USER_ID}") print(f"API_TOKEN: {API_TOKEN[:8]}...") workflow_path = Path(args.workflow) if not workflow_path.exists(): print(f"ERROR: 文件不存在: {workflow_path}") sys.exit(1) with open(workflow_path, "r", encoding="utf-8") as f: workflow_api = json.load(f) # 提交前 check input_dir = Path(args.input_dir) all_input_files = list(input_dir.rglob("*")) if input_dir.exists() else [] result = analyze(workflow_api) missing = check_files_exist(result["file_inputs"], all_input_files) if result["issues"] or missing: print("❌ workflow 检查未通过:") for issue in result["issues"]: print(f" {issue}") for fi in missing: print(f" 缺少文件: {fi['filename']} (节点 [{fi['node_id']}] {fi['class_type']})") print(f" 请将该文件放入 {input_dir}/ 目录") sys.exit(1) if result["widget_params"]: print("⚠️ 存在 widget_* 占位参数,参数名可能不准确,继续运行...") else: print("✓ workflow 检查通过") client_id = str(uuid.uuid4()) server_id = args.server_id # None if not provided try: # 1. 启动机器(或复用已有机器) if server_id: print(f"复用已有机器: {server_id}") comfy_url = wait_for_ready(server_id) else: server_id = launch_machine(args.version_id, args.server_type, args.duration) comfy_url = wait_for_ready(server_id) # 2. 上传 input 目录 if args.skip_upload: print("跳过文件上传 (--skip-upload)") else: print(f"\n上传 input 目录: {input_dir}") upload_input_dir(comfy_url, input_dir) # 3. 提交 workflow print(f"\n提交 workflow...") prompt_id = submit_prompt(comfy_url, workflow_api, client_id) # 4. 监听执行进度 wait_for_completion(comfy_url, client_id, prompt_id) # 5. 下载结果 print(f"\n下载结果...") saved = download_outputs(comfy_url, prompt_id, Path(args.output_dir)) print(f"\n完成,共 {len(saved)} 个文件") if args.keep_alive: print(f"\n--keep-alive 模式,机器保持运行: {server_id}") print(f"ComfyUI URL: {comfy_url}") else: print("\n关闭机器...") stop_machine(server_id) except Exception as e: print(f"\n错误: {e}") print(f"机器 {server_id} 未自动关闭,请手动处理") sys.exit(1) if __name__ == "__main__": main()