| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- #!/usr/bin/env python3
- """RunComfy Server API + ComfyUI Backend API
- 流程:启动机器 → 上传 input/ 目录文件 → 提交 workflow → WebSocket 监听 → 下载结果 → 关机
- input/ 目录结构:
- input/
- ├── images/ → 上传到 ComfyUI input/(LoadImage 节点用)
- ├── loras/ → 上传到 ComfyUI models/loras/
- ├── checkpoints/ → 上传到 ComfyUI models/checkpoints/
- ├── vae/ → 上传到 ComfyUI models/vae/
- └── (其他文件) → 上传到 ComfyUI input/
- 用法:
- python run_workflow.py workflow_api.json
- python run_workflow.py workflow_api.json --input-dir ./input
- python run_workflow.py workflow_api.json --input-dir ./input --server-type large
- """
- import argparse
- import json
- import os
- import sys
- import time
- import urllib.parse
- import uuid
- from pathlib import Path
- import requests
- import websocket
- from dotenv import load_dotenv
- from check_workflow import analyze, check_files_exist
- load_dotenv(Path(__file__).parent.parent.parent / ".env")
- BASE_URL = "https://beta-api.runcomfy.net/prod/api"
- USER_ID = os.getenv("RUNCOMFY_USER_ID")
- API_TOKEN = os.getenv("API_TOKEN")
- HEADERS = {
- "Authorization": f"Bearer {API_TOKEN}",
- "Content-Type": "application/json",
- }
- DEFAULT_VERSION_ID = "90f77137-ba75-400d-870f-204c614ae8a3" # RunComfy/ComfyUI-NodesLoaded
- # input/ 子目录 → ComfyUI 上传类型和 subfolder 映射
- # type="input" 对应 ComfyUI 的 input 目录
- # type="model" 暂无官方支持,lora 等模型走 subfolder 区分
- SUBDIR_UPLOAD_MAP = {
- "images": {"type": "input", "subfolder": ""},
- "loras": {"type": "input", "subfolder": "loras"},
- "checkpoints": {"type": "input", "subfolder": "checkpoints"},
- "vae": {"type": "input", "subfolder": "vae"},
- "controlnet": {"type": "input", "subfolder": "controlnet"},
- "upscale": {"type": "input", "subfolder": "upscale_models"},
- }
- # ── 机器管理 ──────────────────────────────────────────────
- def launch_machine(version_id: str, server_type: str = "medium", duration: int = 3600) -> str:
- payload = {
- "workflow_version_id": version_id,
- "server_type": server_type,
- "estimated_duration": duration,
- }
- resp = requests.post(f"{BASE_URL}/users/{USER_ID}/servers", headers=HEADERS, json=payload)
- if not resp.ok:
- print(f" HTTP {resp.status_code}: {resp.text}")
- resp.raise_for_status()
- print(f" 响应: {resp.json()}")
- server_id = resp.json()["server_id"]
- print(f"机器已创建: {server_id} (type={server_type})")
- return server_id
- def wait_for_ready(server_id: str, timeout: int = 300) -> str:
- print("等待机器就绪...")
- start = time.time()
- while time.time() - start < timeout:
- resp = requests.get(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS)
- resp.raise_for_status()
- data = resp.json()
- status = data.get("current_status", "")
- print(f" 状态: {status}")
- if status == "Ready":
- url = data["main_service_url"].rstrip("/")
- print(f" 就绪: {url}")
- return url
- if status in ("Error", "Failed"):
- raise Exception(f"机器启动失败: {status}")
- time.sleep(5)
- raise TimeoutError(f"等待超时 ({timeout}s)")
- def stop_machine(server_id: str):
- resp = requests.delete(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS)
- resp.raise_for_status()
- print(f"机器已关闭: {server_id}")
- # ── 文件上传 ──────────────────────────────────────────────
- def upload_file(comfy_url: str, file_path: Path, file_type: str = "input", subfolder: str = "") -> str:
- """上传文件到 ComfyUI,返回服务器上的实际文件名"""
- with open(file_path, "rb") as f:
- files = [("image", (file_path.name, f, "application/octet-stream"))]
- data = {"overwrite": "true", "type": file_type, "subfolder": subfolder}
- resp = requests.post(f"{comfy_url}/upload/image", data=data, files=files)
- resp.raise_for_status()
- server_name = resp.json()["name"]
- subfolder_str = f" → {subfolder}/{server_name}" if subfolder else f" → {server_name}"
- print(f" 上传: {file_path.name}{subfolder_str}")
- return server_name
- def upload_input_dir(comfy_url: str, input_dir: Path) -> dict[str, str]:
- """
- 扫描 input_dir,按子目录上传文件,返回 {原文件名: 服务器文件名} 映射
- - input/images/ → type=input, subfolder=""
- - input/loras/ → type=input, subfolder="loras"
- - input/*.png → type=input, subfolder=""(根目录文件)
- """
- if not input_dir.exists():
- print(f" input 目录不存在: {input_dir}")
- return {}
- uploaded = {}
- IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"}
- VIDEO_EXTS = {".mp4", ".avi", ".mov", ".webm"}
- MODEL_EXTS = {".safetensors", ".ckpt", ".pt", ".pth", ".gguf"}
- ALL_EXTS = IMAGE_EXTS | VIDEO_EXTS | MODEL_EXTS
- # 根目录文件 → input
- for f in input_dir.iterdir():
- if f.is_file() and f.suffix.lower() in ALL_EXTS:
- server_name = upload_file(comfy_url, f, "input", "")
- uploaded[f.name] = server_name
- # 子目录文件
- for subdir in input_dir.iterdir():
- if not subdir.is_dir():
- continue
- mapping = SUBDIR_UPLOAD_MAP.get(subdir.name, {"type": "input", "subfolder": subdir.name})
- for f in subdir.iterdir():
- if f.is_file() and f.suffix.lower() in ALL_EXTS:
- server_name = upload_file(comfy_url, f, mapping["type"], mapping["subfolder"])
- uploaded[f.name] = server_name
- return uploaded
- # ── 提交 workflow ─────────────────────────────────────────
- def submit_prompt(comfy_url: str, workflow_api: dict, client_id: str) -> str:
- payload = {"prompt": workflow_api, "client_id": client_id}
- resp = requests.post(f"{comfy_url}/prompt", json=payload)
- resp.raise_for_status()
- data = resp.json()
- if data.get("node_errors"):
- print(f" 节点错误: {data['node_errors']}")
- prompt_id = data["prompt_id"]
- print(f"任务已提交: {prompt_id}")
- return prompt_id
- # ── WebSocket 监听 ────────────────────────────────────────
- def wait_for_completion(comfy_url: str, client_id: str, prompt_id: str, timeout: int = 600):
- scheme = "wss" if comfy_url.startswith("https") else "ws"
- ws_url = f"{scheme}://{comfy_url.split('://', 1)[-1]}/ws?clientId={client_id}"
- print("WebSocket 监听中...")
- ws = websocket.WebSocket()
- ws.settimeout(timeout)
- ws.connect(ws_url)
- try:
- while True:
- out = ws.recv()
- if not out or isinstance(out, bytes):
- continue
- msg = json.loads(out)
- msg_type = msg.get("type", "")
- data = msg.get("data", {})
- if msg_type == "executing":
- node = data.get("node")
- if data.get("prompt_id") == prompt_id and node is None:
- print(" 执行完成")
- break
- if node:
- print(f" 执行节点: {node}")
- elif msg_type == "progress":
- value = data.get("value", 0)
- max_val = data.get("max", 1)
- print(f" 进度: {value}/{max_val}")
- elif msg_type == "execution_error":
- if data.get("prompt_id") == prompt_id:
- raise Exception(f"执行错误: {data.get('exception_message', 'unknown')}")
- finally:
- ws.close()
- # ── 下载结果 ──────────────────────────────────────────────
- def download_outputs(comfy_url: str, prompt_id: str, output_dir: Path) -> list[str]:
- resp = requests.get(f"{comfy_url}/history/{prompt_id}")
- resp.raise_for_status()
- data = resp.json().get(prompt_id, {})
- outputs = data.get("outputs", {})
- output_dir.mkdir(parents=True, exist_ok=True)
- saved = []
- for node_id, node_output in outputs.items():
- if "images" in node_output:
- for image in node_output["images"]:
- params = {"filename": image["filename"], "subfolder": image.get("subfolder", ""), "type": image.get("temp") or image.get("type", "output")}
- resp = requests.get(f"{comfy_url}/view?{urllib.parse.urlencode(params)}")
- resp.raise_for_status()
- out_path = output_dir / image["filename"]
- out_path.write_bytes(resp.content)
- print(f" 图片: {out_path}")
- saved.append(str(out_path))
- if "gifs" in node_output:
- for video in node_output["gifs"]:
- params = {"filename": video["filename"], "subfolder": video.get("subfolder", ""), "format": video.get("format", "mp4")}
- resp = requests.get(f"{comfy_url}/view?{urllib.parse.urlencode(params)}")
- resp.raise_for_status()
- out_path = output_dir / video["filename"]
- out_path.write_bytes(resp.content)
- print(f" 视频: {out_path}")
- saved.append(str(out_path))
- return saved
- # ── 主流程 ────────────────────────────────────────────────
- def main():
- parser = argparse.ArgumentParser(description="RunComfy workflow runner")
- parser.add_argument("workflow", help="workflow_api.json 路径")
- parser.add_argument("--input-dir", default="input", metavar="DIR",
- help="输入文件目录,默认 input/。子目录 images/loras/checkpoints/vae/ 自动上传到对应位置")
- parser.add_argument("--version-id", default=DEFAULT_VERSION_ID, help="RunComfy workflow version_id")
- parser.add_argument("--server-type", default="medium",
- choices=["medium", "large", "extra-large", "2x-large", "2xl-turbo"])
- parser.add_argument("--duration", type=int, default=3600, help="预估运行时长(秒),默认3600")
- parser.add_argument("--keep-alive", action="store_true", help="完成后不自动关机")
- parser.add_argument("--server-id", metavar="ID", help="复用已有机器,跳过启动步骤")
- parser.add_argument("--skip-upload", action="store_true", help="跳过文件上传,直接提交 workflow")
- parser.add_argument("--output-dir", default="output", metavar="DIR", help="结果下载目录,默认 output/")
- args = parser.parse_args()
- if not USER_ID or not API_TOKEN:
- print("ERROR: 请设置 RUNCOMFY_USER_ID 和 API_TOKEN 环境变量")
- sys.exit(1)
- print(f"USER_ID : {USER_ID}")
- print(f"API_TOKEN: {API_TOKEN[:8]}...")
- workflow_path = Path(args.workflow)
- if not workflow_path.exists():
- print(f"ERROR: 文件不存在: {workflow_path}")
- sys.exit(1)
- with open(workflow_path, "r", encoding="utf-8") as f:
- workflow_api = json.load(f)
- # 提交前 check
- input_dir = Path(args.input_dir)
- all_input_files = list(input_dir.rglob("*")) if input_dir.exists() else []
- result = analyze(workflow_api)
- missing = check_files_exist(result["file_inputs"], all_input_files)
- if result["issues"] or missing:
- print("❌ workflow 检查未通过:")
- for issue in result["issues"]:
- print(f" {issue}")
- for fi in missing:
- print(f" 缺少文件: {fi['filename']} (节点 [{fi['node_id']}] {fi['class_type']})")
- print(f" 请将该文件放入 {input_dir}/ 目录")
- sys.exit(1)
- if result["widget_params"]:
- print("⚠️ 存在 widget_* 占位参数,参数名可能不准确,继续运行...")
- else:
- print("✓ workflow 检查通过")
- client_id = str(uuid.uuid4())
- server_id = args.server_id # None if not provided
- try:
- # 1. 启动机器(或复用已有机器)
- if server_id:
- print(f"复用已有机器: {server_id}")
- comfy_url = wait_for_ready(server_id)
- else:
- server_id = launch_machine(args.version_id, args.server_type, args.duration)
- comfy_url = wait_for_ready(server_id)
- # 2. 上传 input 目录
- if args.skip_upload:
- print("跳过文件上传 (--skip-upload)")
- else:
- print(f"\n上传 input 目录: {input_dir}")
- upload_input_dir(comfy_url, input_dir)
- # 3. 提交 workflow
- print(f"\n提交 workflow...")
- prompt_id = submit_prompt(comfy_url, workflow_api, client_id)
- # 4. 监听执行进度
- wait_for_completion(comfy_url, client_id, prompt_id)
- # 5. 下载结果
- print(f"\n下载结果...")
- saved = download_outputs(comfy_url, prompt_id, Path(args.output_dir))
- print(f"\n完成,共 {len(saved)} 个文件")
- if args.keep_alive:
- print(f"\n--keep-alive 模式,机器保持运行: {server_id}")
- print(f"ComfyUI URL: {comfy_url}")
- else:
- print("\n关闭机器...")
- stop_machine(server_id)
- except Exception as e:
- print(f"\n错误: {e}")
- print(f"机器 {server_id} 未自动关闭,请手动处理")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|