import os import json import requests import warnings from pathlib import Path from dotenv import load_dotenv load_dotenv() # --------------------------------------------------------------------------- # Tag annotations for example workflows — maps filename stems to a brief # human-readable description so that agents can quickly decide which example # to load. Maintained manually; new scraped files should be added here. # --------------------------------------------------------------------------- _WORKFLOW_ANNOTATIONS = { # --- flux --- "flux_dev_example": "Flux Dev txt2img (UNETLoader + SamplerCustomAdvanced + ModelSamplingFlux)", "flux_dev_checkpoint_example": "Flux Dev txt2img via CheckpointLoaderSimple (简化版, KSampler cfg=1)", "flux_schnell_example": "Flux Schnell 4-step fast txt2img (UNETLoader)", "flux_schnell_checkpoint_example": "Flux Schnell 4-step via CheckpointLoaderSimple", "flux_controlnet_example": "Flux + ControlNet (Canny) via ControlNetApplySD3", "flux_canny_model_example": "Flux Canny 内建模型 (InstructPixToPixConditioning)", "flux_depth_lora_example": "Flux Depth LoRA (LoraLoaderModelOnly + InstructPixToPixConditioning)", "flux_fill_inpaint_example": "Flux Fill inpaint (DifferentialDiffusion + InpaintModelConditioning)", "flux_fill_outpaint_example": "Flux Fill outpaint (ImagePadForOutpaint + InpaintModelConditioning)", "flux_redux_model_example": "Flux Redux 图像风格迁移 (CLIPVisionLoader + StyleModelApply)", "flux_kontext_example": "Flux Kontext 图像编辑/角色一致性 (FluxKontextImageScale + ReferenceLatent)", # --- flux2 --- "flux2_example": "Flux 2 txt2img", # --- sdxl --- "sdxl_simple_example": "SDXL Base+Refiner 两阶段 txt2img (KSamplerAdvanced)", "sdxl_refiner_prompt_example": "SDXL Base+Refiner 各自独立提示词", "sdxl_revision_text_prompts": "SDXL Revision 文本提示融合", "sdxl_revision_zero_positive": "SDXL Revision zero-positive 风格", # --- sd3 --- "sd3.5_simple_example": "SD3.5 基础 txt2img", "sd3.5_text_encoders_example": "SD3.5 三编码器 (clip_g + clip_l + t5xxl)", "sd3.5_large_canny_controlnet_example": "SD3.5 Large + Canny ControlNet", # --- controlnet --- "controlnet_example": "SD1.5 ControlNet (scribble) 基础用法", "depth_controlnet": "SD1.5 Depth ControlNet", "depth_t2i_adapter": "SD1.5 Depth T2I-Adapter", "mixing_controlnets": "SD1.5 混合双 ControlNet (openpose + scribble)", "2_pass_pose_worship": "SD1.5 两阶段 ControlNet + LatentUpscale Hi-Res Fix", # --- img2img --- "img2img_workflow": "SD1.5 图生图 (VAEEncode, denoise<1)", # --- inpaint --- "inpaint_example": "SD1.5 Inpaint (VAEEncodeForInpaint + 专用 checkpoint)", "inpain_model_cat": "SD1.5 Inpaint 猫咪涂抹", "inpain_model_woman": "SD1.5 Inpaint 女性涂抹", "inpain_model_outpainting": "SD1.5 Outpaint (ImagePadForOutpaint + VAEEncodeForInpaint)", "inpaint_anythingv3_woman": "SD1.5 Inpaint AnythingV3", "yosemite_outpaint_example": "SD1.5 Outpaint Yosemite 扩展画布", # --- lora --- "lora": "SD1.5 单 LoRA (LoraLoader)", "lora_multiple": "SD1.5 多 LoRA 链式堆叠", # --- upscale --- "esrgan_example": "ESRGAN 超分辨率 (UpscaleModelLoader + ImageUpscaleWithModel)", # --- area_composition --- "square_area_for_subject": "区域化构图 (ConditioningSetArea)", "workflow_night_evening_day_morning": "四时段区域化构图", # --- others --- "aura_flow_0.1_example": "AuraFlow 0.1 txt2img", "aura_flow_0.2_example": "AuraFlow 0.2 txt2img", "chroma_example": "Chroma 模型 txt2img", "cosmos_predict2_2b_t2i_example":"Cosmos Predict2 2B txt2img", "sdxl_edit_model": "SDXL Edit Model (InstructPixToPixConditioning)", "gligen_textbox_example": "GLIGEN 文本框定位 (GLIGENTextBoxApply)", "hidream_dev_example": "HiDream Dev txt2img", "hidream_e1.1_example": "HiDream E1.1 txt2img", "hidream_full_example": "HiDream Full txt2img", "hunyuan_dit_1.2_example": "HunyuanDiT 1.2 txt2img", "hunyuan_image_example": "Hunyuan Image txt2img", "hypernetwork_example": "Hypernetwork 示例", "lcm_basic_example": "LCM 快速采样", "lumina2_basic_example": "Lumina2 txt2img", "model_merging_basic": "模型合并 基础 (ModelMergeSimple)", "model_merging_3_checkpoints": "模型合并 三模型", "model_merging_cosxl": "模型合并 CosXL", "model_merging_inpaint": "模型合并 Inpaint", "model_merging_lora": "模型合并 LoRA", "noisy_latents_3_subjects": "噪声潜空间构图 三主体", "noisy_latents_3_subjects_": "噪声潜空间构图 三主体 (变体)", "omnigen2_example": "OmniGen2 txt2img", "qwen_image_basic_example": "Qwen Image 基础 txt2img", "qwen_image_edit_basic_example": "Qwen Image Edit 基础编辑", "qwen_image_edit_2509_basic_example": "Qwen Image Edit 2509 编辑", "sdxlturbo_example": "SDXL Turbo 快速采样", "stable_cascade__text_to_image": "Stable Cascade txt2img", "stable_cascade__image_to_image":"Stable Cascade img2img", "stable_cascade__canny_controlnet":"Stable Cascade Canny ControlNet", "stable_cascade__inpaint_controlnet":"Stable Cascade Inpaint ControlNet", "stable_cascade__image_remixing":"Stable Cascade 图像混合", "stable_cascade__image_remixing_multiple":"Stable Cascade 多图混合", "embedding_example": "Textual Inversion (embedding) 示例", "unclip_example": "UnCLIP 基础", "unclip_2pass": "UnCLIP 两阶段", "unclip_example_multiple": "UnCLIP 多图输入", "z_image_turbo_example": "Z-Image Turbo txt2img", } class RunComfySchemaInspector: def __init__(self, server_url=None): self.server_url = server_url self.object_info = {} # We store a backup copy inside this directory in case the cloud machine is sleep/offline. self.cache_path = os.path.join(os.path.dirname(__file__), "object_info_cache.json") self._examples_dir = self._locate_examples_dir() self._hot_reload() # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ @staticmethod def _locate_examples_dir() -> str: """Walk up from this file to find the project root containing `data/comfyui_examples`.""" anchor = Path(__file__).resolve() for parent in [anchor] + list(anchor.parents): candidate = parent / "data" / "comfyui_examples" if candidate.is_dir(): return str(candidate) # Fallback: assume standard repo layout (this file is 3 levels deep under repo root) fallback = Path(__file__).resolve().parents[3] / "data" / "comfyui_examples" return str(fallback) def _hot_reload(self): """Attempts to fetch fresh object_info from the server. Falls back to cached JSON if offline.""" if not self.server_url: # Fallback to standard generic environment if none provided # Find an active server if possible, or use the dedicated testing ID self.server_url = "https://90f77137-ba75-400d-870f-204c614ae8a3-comfyui.runcomfy.com" print(f"[SchemaInspector] Attempting hot-reload from {self.server_url}/object_info...") try: resp = requests.get(f"{self.server_url}/object_info", timeout=10) if resp.status_code == 200: self.object_info = resp.json() # Update local cache with open(self.cache_path, "w", encoding="utf-8") as f: json.dump(self.object_info, f) print("[SchemaInspector] Successfully updated schema from remote server.") return except Exception as e: print(f"[SchemaInspector] Warning: Hot-reload failed ({e}). Machine might be offline.") if os.path.exists(self.cache_path): print("[SchemaInspector] Loading schema from local cache.") with open(self.cache_path, "r", encoding="utf-8") as f: self.object_info = json.load(f) else: warnings.warn("No active remote connection and no local cache found! Some tools will fail.") # ------------------------------------------------------------------ # Schema / Model inspection (existing) # ------------------------------------------------------------------ def get_node_schema(self, class_type: str) -> dict: """Returns the rigorous Required and Optional properties for a specific ComfyUI Node.""" if class_type not in self.object_info: return {"error": f"Node '{class_type}' not found in the environment registry."} node_def = self.object_info[class_type] schema = { "name": class_type, "inputs": { "required": node_def.get("input", {}).get("required", {}), "optional": node_def.get("input", {}).get("optional", {}) }, "outputs": node_def.get("output_name", []) } return schema def search_models(self, category: str = "checkpoints", keyword: str = "") -> list: """ category: 'checkpoints', 'loras', 'vaes', 'controlnets' """ target_keys = { "checkpoints": ("CheckpointLoaderSimple", "ckpt_name"), "loras": ("LoraLoader", "lora_name"), "vaes": ("VAELoader", "vae_name"), "controlnets": ("ControlNetLoader", "control_net_name") } if category not in target_keys: return [] node_type, prop = target_keys[category] if node_type not in self.object_info: return [] try: items = self.object_info[node_type]["input"]["required"][prop][0] kw = keyword.lower() return [x for x in items if kw in x.lower()] except (KeyError, IndexError): return [] def verify_workflow(self, api_json: dict) -> dict: """ Validates an LLM-generated API JSON against the dynamic schema. Returns a dict containing {"valid": bool, "errors": list_of_strings} """ errors = [] if not isinstance(api_json, dict): return {"valid": False, "errors": ["workflow_api 必须是一个字典对象"]} for node_id, node in api_json.items(): ctype = node.get("class_type") if not ctype: errors.append(f"Node '{node_id}' is missing a class_type.") continue if ctype not in self.object_info: errors.append(f"Node '{node_id}' requests non-existent class '{ctype}'.") continue expected_req = self.object_info[ctype].get("input", {}).get("required", {}) actual_inputs = node.get("inputs", {}) for req_key in expected_req.keys(): if req_key not in actual_inputs: errors.append(f"Node '{node_id}' ({ctype}) is missing REQUIRED input '{req_key}'.") # 检查连线合法性 (检查断线) for input_key, input_value in actual_inputs.items(): if isinstance(input_value, list) and len(input_value) >= 2: target_node = str(input_value[0]) if target_node not in api_json: errors.append(f"Node '{node_id}' ({ctype}) 的 '{input_key}' 连向了不存在的节点: '{target_node}'") return { "valid": len(errors) == 0, "errors": errors } # ------------------------------------------------------------------ # Example workflow browsing & loading (NEW) # ------------------------------------------------------------------ def list_example_workflows(self, category: str = None, keyword: str = "") -> dict: """ Browse the built-in example workflow library. Args: category: Filter by subdirectory name (e.g. 'flux', 'controlnet', 'inpaint'). Pass None to list ALL categories and their workflows. keyword: Optional keyword filter on filename (case-insensitive). Returns: { "examples_dir": str, "categories": { "flux": [ {"name": "flux_dev_example", "file": "flux_dev_example_api.json", "description": "Flux Dev txt2img ...", "path": "flux/flux_dev_example_api.json"}, ... ], ... } } """ base = Path(self._examples_dir) if not base.is_dir(): return {"error": f"Examples directory not found: {self._examples_dir}"} kw = keyword.lower() result_categories = {} for cat_dir in sorted(base.iterdir()): if not cat_dir.is_dir(): continue cat_name = cat_dir.name # Category filter if category and cat_name != category: continue entries = [] for json_file in sorted(cat_dir.glob("*.json")): fname = json_file.name # strip the trailing _api.json to get the stem stem = fname.replace("_api.json", "").replace("_api.", ".") # Keyword filter on stem if kw and kw not in stem.lower(): continue desc = _WORKFLOW_ANNOTATIONS.get(stem, "") entries.append({ "name": stem, "file": fname, "description": desc, "path": f"{cat_name}/{fname}", }) if entries: result_categories[cat_name] = entries return { "examples_dir": self._examples_dir, "categories": result_categories, } def load_example_workflow(self, name: str) -> dict: """ Load a specific example workflow as a Python dict. Args: name: Can be any of the following forms: - Full relative path: "flux/flux_dev_example_api.json" - Stem (auto-resolved): "flux_dev_example" - Partial keyword: "flux_dev" (picks first match) Returns: {"name": str, "path": str, "description": str, "workflow": dict} or {"error": str} if not found. """ base = Path(self._examples_dir) # Strategy 1: exact relative path full_path = base / name if full_path.is_file(): return self._read_example(full_path) # Strategy 2: exact stem → category/stem_api.json for json_file in base.rglob("*.json"): stem = json_file.name.replace("_api.json", "").replace("_api.", ".") if stem == name: return self._read_example(json_file) # Strategy 3: partial keyword match (first hit) name_lower = name.lower() for json_file in sorted(base.rglob("*.json")): if name_lower in json_file.stem.lower(): return self._read_example(json_file) return {"error": f"No example workflow matching '{name}' found in {self._examples_dir}"} def _read_example(self, filepath: Path) -> dict: """Read a single example JSON and return annotated result.""" base = Path(self._examples_dir) rel = filepath.relative_to(base) stem = filepath.name.replace("_api.json", "").replace("_api.", ".") desc = _WORKFLOW_ANNOTATIONS.get(stem, "") with open(filepath, "r", encoding="utf-8") as f: wf = json.load(f) # Build a quick summary of nodes used node_types = sorted(set( n.get("class_type", "?") for n in wf.values() if isinstance(n, dict) )) return { "name": stem, "path": str(rel), "description": desc, "node_types_used": node_types, "node_count": len(wf), "workflow": wf, } # =========================================================================== # FastAPI 服务层 — 统一 /query 端点,通过 action 参数分发 # =========================================================================== import argparse from typing import Any, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field import uvicorn app = FastAPI(title="RunComfy Workflow Builder", version="1.0") # 懒加载 inspector(首次请求时初始化,避免启动时阻塞) _inspector: RunComfySchemaInspector | None = None def _get_inspector() -> RunComfySchemaInspector: global _inspector if _inspector is None: _inspector = RunComfySchemaInspector() return _inspector class QueryRequest(BaseModel): action: str = Field(..., description=( "要执行的操作: " "search_models | get_node_schema | verify_workflow | " "list_examples | load_example | read_skill" )) # 以下参数按 action 使用,不需要的可以不传 category: Optional[str] = Field(None, description="模型分类(checkpoints/loras/vaes/controlnets) 或示例分类(flux/controlnet/...)") keyword: Optional[str] = Field(None, description="搜索关键词") class_type: Optional[str] = Field(None, description="ComfyUI 节点类型名 (get_node_schema 用)") name: Optional[str] = Field(None, description="示例工作流名称 (load_example 用)") workflow: Optional[dict[str, Any]] = Field(None, description="待验证的 API JSON (verify_workflow 用)") @app.get("/health") def health(): return {"status": "ok"} @app.post("/") def query(req: QueryRequest): """统一入口 — 根据 action 分发到对应的内部方法。""" inspector = _get_inspector() # ---------- search_models ---------- if req.action == "search_models": results = inspector.search_models( category=req.category or "checkpoints", keyword=req.keyword or "" ) return {"action": "search_models", "count": len(results), "models": results} # ---------- get_node_schema ---------- elif req.action == "get_node_schema": if not req.class_type: raise HTTPException(400, "get_node_schema 需要 class_type 参数") schema = inspector.get_node_schema(req.class_type) return {"action": "get_node_schema", **schema} # ---------- verify_workflow ---------- elif req.action == "verify_workflow": if not req.workflow: raise HTTPException(400, "verify_workflow 需要 workflow 参数 (API JSON dict)") result = inspector.verify_workflow(req.workflow) return {"action": "verify_workflow", **result} # ---------- list_examples ---------- elif req.action == "list_examples": result = inspector.list_example_workflows( category=req.category, keyword=req.keyword or "" ) return {"action": "list_examples", **result} # ---------- load_example ---------- elif req.action == "load_example": if not req.name: raise HTTPException(400, "load_example 需要 name 参数") result = inspector.load_example_workflow(req.name) if "error" in result: raise HTTPException(404, result["error"]) return {"action": "load_example", **result} # ---------- read_skill ---------- elif req.action == "read_skill": skill_path = Path(__file__).parent / "skill.md" if not skill_path.exists(): raise HTTPException(404, "skill.md not found") content = skill_path.read_text(encoding="utf-8") return {"action": "read_skill", "content": content} else: raise HTTPException(400, ( f"未知 action: '{req.action}'。" "支持: search_models, get_node_schema, verify_workflow, " "list_examples, load_example, read_skill" )) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=8010) args = parser.parse_args() uvicorn.run(app, host="0.0.0.0", port=args.port)