| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- """RunComfy Workflow HTTP API"""
- import base64
- import json
- import os
- import uuid
- from typing import Optional
- import requests
- import websocket
- from dotenv import load_dotenv
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- load_dotenv()
- app = FastAPI(title="RunComfy Workflow API")
- BASE_URL = "https://beta-api.runcomfy.net/prod/api"
- USER_ID = os.getenv("RUNCOMFY_USER_ID")
- API_TOKEN = os.getenv("API_TOKEN")
- HEADERS = {
- "Authorization": f"Bearer {API_TOKEN}",
- "Content-Type": "application/json",
- }
- SUBDIR_UPLOAD_MAP = {
- "images": {"type": "input", "subfolder": ""},
- "loras": {"type": "input", "subfolder": "loras"},
- "checkpoints": {"type": "input", "subfolder": "checkpoints"},
- "vae": {"type": "input", "subfolder": "vae"},
- }
- class InputFile(BaseModel):
- filename: str
- type: str
- base64_data: Optional[str] = None
- url: Optional[str] = None
- class WorkflowRequest(BaseModel):
- server_id: str
- workflow_api: dict
- input_files: Optional[list[InputFile]] = None
- class WorkflowResponse(BaseModel):
- prompt_id: str
- images: list[str]
- status: str
- server_id: str
- def get_server_url(server_id: str) -> str:
- resp = requests.get(f"{BASE_URL}/users/{USER_ID}/servers/{server_id}", headers=HEADERS)
- resp.raise_for_status()
- data = resp.json()
- if data.get("current_status") != "Ready":
- raise Exception(f"机器未就绪: {data.get('current_status')}")
- return data["main_service_url"].rstrip("/")
- def upload_file_bytes(comfy_url: str, filename: str, file_bytes: bytes, file_type: str, subfolder: str):
- files = [("image", (filename, file_bytes, "application/octet-stream"))]
- data = {"overwrite": "true", "type": file_type, "subfolder": subfolder}
- resp = requests.post(f"{comfy_url}/upload/image", data=data, files=files)
- resp.raise_for_status()
- return resp.json()["name"]
- def verify_workflow_api(workflow_api: dict):
- if not isinstance(workflow_api, dict):
- raise ValueError("workflow_api 必须是一个字典对象存放各个节点")
-
- for node_id, node_data in workflow_api.items():
- if not isinstance(node_data, dict):
- raise ValueError(f"节点 {node_id} 数据异常:必须是一个字典")
-
- if "class_type" not in node_data:
- raise ValueError(f"节点 {node_id} 数据异常:缺少必填字段 'class_type'")
-
- inputs = node_data.get("inputs", {})
- if not isinstance(inputs, dict):
- raise ValueError(f"节点 {node_id} (class_type: {node_data['class_type']}) 数据异常:inputs 必须是一个字典")
-
- for input_key, input_value in inputs.items():
- # 检测连线格式:必须是 [node_id, port_index] 且 node_id 存在于 workflow_api 中
- if isinstance(input_value, list) and len(input_value) >= 2:
- target_node = str(input_value[0])
- if target_node not in workflow_api:
- raise ValueError(f"节点 {node_id} ({node_data['class_type']}) 的输入 '{input_key}' 引用了不存在的节点 ID: {target_node}。图存在断线!")
- def submit_prompt(comfy_url: str, workflow_api: dict, client_id: str) -> str:
- payload = {"prompt": workflow_api, "client_id": client_id}
- resp = requests.post(f"{comfy_url}/prompt", json=payload)
- try:
- resp.raise_for_status()
- except requests.exceptions.HTTPError as e:
- error_details = resp.text
- raise Exception(f"提交工作流失败: 400 Bad Request. 你的 workflow_api JSON 结构可能存在致命错误。\nComfyUI 返回的详细诊断信息: {error_details}")
- return resp.json()["prompt_id"]
- def wait_for_completion(comfy_url: str, client_id: str, prompt_id: str, timeout: int = 1200):
- import time
- import requests
- from requests.adapters import HTTPAdapter
- from urllib3.util.retry import Retry
- start_time = time.time()
- session = requests.Session()
- retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
- adapter = HTTPAdapter(max_retries=retry_strategy)
- session.mount("https://", adapter)
- session.mount("http://", adapter)
-
- error_count = 0
- while time.time() - start_time < timeout:
- try:
- # 1. Check history to see if done
- resp = session.get(f"{comfy_url}/history/{prompt_id}", timeout=15)
- if resp.status_code == 200:
- data = resp.json()
- if prompt_id in data:
- return # Successfully completed
-
- # 2. Check queue to make sure it didn't error out completely
- resp_q = session.get(f"{comfy_url}/queue", timeout=15)
- if resp_q.status_code == 200:
- q_data = resp_q.json()
- running = q_data.get("queue_running", [])
- pending = q_data.get("queue_pending", [])
-
- is_active = False
- for task in running + pending:
- # task elements are usually: [id, prompt_id, prompt, extra_data, prompt_out]
- if len(task) > 1 and str(task[1]) == str(prompt_id):
- is_active = True
- break
-
- if not is_active:
- # It disappeared from queue but isn't in history!
- # Give it a tiny sleep to avoid race conditions
- time.sleep(2)
- resp_cf = session.get(f"{comfy_url}/history/{prompt_id}", timeout=15)
- if resp_cf.status_code == 200 and prompt_id in resp_cf.json():
- return
- raise Exception("执行中止:任务已从队列消失且未成功写入历史(可能某个节点出错。请检查工作流输入参数。)")
-
- error_count = 0
-
- except requests.exceptions.RequestException as e:
- error_count += 1
- print(f"Polling HTTP Error (count={error_count}): {e}")
- if error_count > 10:
- raise Exception(f"与 ComfyUI 服务器断开连接次数达到上限: {e}")
-
- # Sleep before polling again
- time.sleep(5)
-
- raise Exception(f"任务执行超时 (未在 {timeout} 秒内完成)")
- def get_comfy_image_urls(comfy_url: str, prompt_id: str) -> list[str]:
- resp = requests.get(f"{comfy_url}/history/{prompt_id}")
- resp.raise_for_status()
- outputs = resp.json().get(prompt_id, {}).get("outputs", {})
- images = []
- import urllib.parse
- for node_output in outputs.values():
- if "images" in node_output:
- for img in node_output["images"]:
- params = {"filename": img["filename"], "subfolder": img.get("subfolder", ""),
- "type": img.get("type", "output")}
- query = urllib.parse.urlencode(params)
- url = f"{comfy_url}/view?{query}"
- images.append(url)
- return images
- @app.post("/run", response_model=WorkflowResponse)
- async def run_workflow(request: WorkflowRequest):
- try:
- comfy_url = get_server_url(request.server_id)
- client_id = str(uuid.uuid4())
- if request.input_files:
- for file in request.input_files:
- mapping = SUBDIR_UPLOAD_MAP.get(file.type, {"type": "input", "subfolder": file.type})
-
- file_bytes = None
- if file.url:
- # 从 CDN 下载
- resp = requests.get(file.url)
- resp.raise_for_status()
- file_bytes = resp.content
- elif file.base64_data:
- # Base64 解码
- file_bytes = base64.b64decode(file.base64_data)
-
- if file_bytes:
- upload_file_bytes(comfy_url, file.filename, file_bytes,
- mapping["type"], mapping["subfolder"])
- else:
- raise Exception(f"Input file {file.filename} must have either 'url' or 'base64_data'")
- verify_workflow_api(request.workflow_api)
- prompt_id = submit_prompt(comfy_url, request.workflow_api, client_id)
- wait_for_completion(comfy_url, client_id, prompt_id)
- images = get_comfy_image_urls(comfy_url, prompt_id)
- return WorkflowResponse(prompt_id=prompt_id, images=images, status="Success", server_id=request.server_id)
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- if __name__ == "__main__":
- import uvicorn
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument("--port", type=int, default=8000)
- args = parser.parse_args()
- uvicorn.run(app, host="0.0.0.0", port=args.port)
|