"""RunComfy Workflow HTTP API""" import base64 import json import os import uuid from typing import Optional import requests import websocket from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from pydantic import BaseModel load_dotenv() app = FastAPI(title="RunComfy Workflow API") BASE_URL = "https://beta-api.runcomfy.net/prod/api" USER_ID = os.getenv("RUNCOMFY_USER_ID") API_TOKEN = os.getenv("API_TOKEN") HEADERS = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json", } SUBDIR_UPLOAD_MAP = { "images": {"type": "input", "subfolder": ""}, "loras": {"type": "input", "subfolder": "loras"}, "checkpoints": {"type": "input", "subfolder": "checkpoints"}, "vae": {"type": "input", "subfolder": "vae"}, } class InputFile(BaseModel): filename: str type: str base64_data: Optional[str] = None url: Optional[str] = None class WorkflowRequest(BaseModel): server_id: str workflow_api: dict input_files: Optional[list[InputFile]] = None class WorkflowResponse(BaseModel): prompt_id: str images: list[str] status: str server_id: str def get_server_url(server_id: str) -> str: resp = requests.get(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS) resp.raise_for_status() data = resp.json() if data.get("current_status") != "Ready": raise Exception(f"机器未就绪: {data.get('current_status')}") return data["main_service_url"].rstrip("/") def upload_file_bytes(comfy_url: str, filename: str, file_bytes: bytes, file_type: str, subfolder: str): files = [("image", (filename, file_bytes, "application/octet-stream"))] data = {"overwrite": "true", "type": file_type, "subfolder": subfolder} resp = requests.post(f"{comfy_url}/upload/image", data=data, files=files) resp.raise_for_status() return resp.json()["name"] def verify_workflow_api(workflow_api: dict): if not isinstance(workflow_api, dict): raise ValueError("workflow_api 必须是一个字典对象存放各个节点") for node_id, node_data in workflow_api.items(): if not isinstance(node_data, dict): raise ValueError(f"节点 {node_id} 数据异常:必须是一个字典") if "class_type" not in node_data: raise ValueError(f"节点 {node_id} 数据异常:缺少必填字段 'class_type'") inputs = node_data.get("inputs", {}) if not isinstance(inputs, dict): raise ValueError(f"节点 {node_id} (class_type: {node_data['class_type']}) 数据异常:inputs 必须是一个字典") for input_key, input_value in inputs.items(): # 检测连线格式:必须是 [node_id, port_index] 且 node_id 存在于 workflow_api 中 if isinstance(input_value, list) and len(input_value) >= 2: target_node = str(input_value[0]) if target_node not in workflow_api: raise ValueError(f"节点 {node_id} ({node_data['class_type']}) 的输入 '{input_key}' 引用了不存在的节点 ID: {target_node}。图存在断线!") def submit_prompt(comfy_url: str, workflow_api: dict, client_id: str) -> str: payload = {"prompt": workflow_api, "client_id": client_id} resp = requests.post(f"{comfy_url}/prompt", json=payload) try: resp.raise_for_status() except requests.exceptions.HTTPError as e: error_details = resp.text raise Exception(f"提交工作流失败: 400 Bad Request. 你的 workflow_api JSON 结构可能存在致命错误。\nComfyUI 返回的详细诊断信息: {error_details}") return resp.json()["prompt_id"] def wait_for_completion(comfy_url: str, client_id: str, prompt_id: str, timeout: int = 1200): import time import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry start_time = time.time() session = requests.Session() retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) error_count = 0 while time.time() - start_time < timeout: try: # 1. Check history to see if done resp = session.get(f"{comfy_url}/history/{prompt_id}", timeout=15) if resp.status_code == 200: data = resp.json() if prompt_id in data: return # Successfully completed # 2. Check queue to make sure it didn't error out completely resp_q = session.get(f"{comfy_url}/queue", timeout=15) if resp_q.status_code == 200: q_data = resp_q.json() running = q_data.get("queue_running", []) pending = q_data.get("queue_pending", []) is_active = False for task in running + pending: # task elements are usually: [id, prompt_id, prompt, extra_data, prompt_out] if len(task) > 1 and str(task[1]) == str(prompt_id): is_active = True break if not is_active: # It disappeared from queue but isn't in history! # Give it a tiny sleep to avoid race conditions time.sleep(2) resp_cf = session.get(f"{comfy_url}/history/{prompt_id}", timeout=15) if resp_cf.status_code == 200 and prompt_id in resp_cf.json(): return raise Exception("执行中止:任务已从队列消失且未成功写入历史(可能某个节点出错。请检查工作流输入参数。)") error_count = 0 except requests.exceptions.RequestException as e: error_count += 1 print(f"Polling HTTP Error (count={error_count}): {e}") if error_count > 10: raise Exception(f"与 ComfyUI 服务器断开连接次数达到上限: {e}") # Sleep before polling again time.sleep(5) raise Exception(f"任务执行超时 (未在 {timeout} 秒内完成)") def get_comfy_image_urls(comfy_url: str, prompt_id: str) -> list[str]: resp = requests.get(f"{comfy_url}/history/{prompt_id}") resp.raise_for_status() outputs = resp.json().get(prompt_id, {}).get("outputs", {}) images = [] import urllib.parse for node_output in outputs.values(): if "images" in node_output: for img in node_output["images"]: params = {"filename": img["filename"], "subfolder": img.get("subfolder", ""), "type": img.get("type", "output")} query = urllib.parse.urlencode(params) url = f"{comfy_url}/view?{query}" images.append(url) return images @app.post("/run", response_model=WorkflowResponse) async def run_workflow(request: WorkflowRequest): try: comfy_url = get_server_url(request.server_id) client_id = str(uuid.uuid4()) if request.input_files: for file in request.input_files: mapping = SUBDIR_UPLOAD_MAP.get(file.type, {"type": "input", "subfolder": file.type}) file_bytes = None if file.url: # 从 CDN 下载 resp = requests.get(file.url) resp.raise_for_status() file_bytes = resp.content elif file.base64_data: # Base64 解码 file_bytes = base64.b64decode(file.base64_data) if file_bytes: upload_file_bytes(comfy_url, file.filename, file_bytes, mapping["type"], mapping["subfolder"]) else: raise Exception(f"Input file {file.filename} must have either 'url' or 'base64_data'") verify_workflow_api(request.workflow_api) prompt_id = submit_prompt(comfy_url, request.workflow_api, client_id) wait_for_completion(comfy_url, client_id, prompt_id) images = get_comfy_image_urls(comfy_url, prompt_id) return WorkflowResponse(prompt_id=prompt_id, images=images, status="Success", server_id=request.server_id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn import argparse parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() uvicorn.run(app, host="0.0.0.0", port=args.port)