| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 |
- import os
- import json
- import requests
- import warnings
- from pathlib import Path
- from dotenv import load_dotenv
- load_dotenv()
- # ---------------------------------------------------------------------------
- # Tag annotations for example workflows — maps filename stems to a brief
- # human-readable description so that agents can quickly decide which example
- # to load. Maintained manually; new scraped files should be added here.
- # ---------------------------------------------------------------------------
- _WORKFLOW_ANNOTATIONS = {
- # --- flux ---
- "flux_dev_example": "Flux Dev txt2img (UNETLoader + SamplerCustomAdvanced + ModelSamplingFlux)",
- "flux_dev_checkpoint_example": "Flux Dev txt2img via CheckpointLoaderSimple (简化版, KSampler cfg=1)",
- "flux_schnell_example": "Flux Schnell 4-step fast txt2img (UNETLoader)",
- "flux_schnell_checkpoint_example": "Flux Schnell 4-step via CheckpointLoaderSimple",
- "flux_controlnet_example": "Flux + ControlNet (Canny) via ControlNetApplySD3",
- "flux_canny_model_example": "Flux Canny 内建模型 (InstructPixToPixConditioning)",
- "flux_depth_lora_example": "Flux Depth LoRA (LoraLoaderModelOnly + InstructPixToPixConditioning)",
- "flux_fill_inpaint_example": "Flux Fill inpaint (DifferentialDiffusion + InpaintModelConditioning)",
- "flux_fill_outpaint_example": "Flux Fill outpaint (ImagePadForOutpaint + InpaintModelConditioning)",
- "flux_redux_model_example": "Flux Redux 图像风格迁移 (CLIPVisionLoader + StyleModelApply)",
- "flux_kontext_example": "Flux Kontext 图像编辑/角色一致性 (FluxKontextImageScale + ReferenceLatent)",
- # --- flux2 ---
- "flux2_example": "Flux 2 txt2img",
- # --- sdxl ---
- "sdxl_simple_example": "SDXL Base+Refiner 两阶段 txt2img (KSamplerAdvanced)",
- "sdxl_refiner_prompt_example": "SDXL Base+Refiner 各自独立提示词",
- "sdxl_revision_text_prompts": "SDXL Revision 文本提示融合",
- "sdxl_revision_zero_positive": "SDXL Revision zero-positive 风格",
- # --- sd3 ---
- "sd3.5_simple_example": "SD3.5 基础 txt2img",
- "sd3.5_text_encoders_example": "SD3.5 三编码器 (clip_g + clip_l + t5xxl)",
- "sd3.5_large_canny_controlnet_example": "SD3.5 Large + Canny ControlNet",
- # --- controlnet ---
- "controlnet_example": "SD1.5 ControlNet (scribble) 基础用法",
- "depth_controlnet": "SD1.5 Depth ControlNet",
- "depth_t2i_adapter": "SD1.5 Depth T2I-Adapter",
- "mixing_controlnets": "SD1.5 混合双 ControlNet (openpose + scribble)",
- "2_pass_pose_worship": "SD1.5 两阶段 ControlNet + LatentUpscale Hi-Res Fix",
- # --- img2img ---
- "img2img_workflow": "SD1.5 图生图 (VAEEncode, denoise<1)",
- # --- inpaint ---
- "inpaint_example": "SD1.5 Inpaint (VAEEncodeForInpaint + 专用 checkpoint)",
- "inpain_model_cat": "SD1.5 Inpaint 猫咪涂抹",
- "inpain_model_woman": "SD1.5 Inpaint 女性涂抹",
- "inpain_model_outpainting": "SD1.5 Outpaint (ImagePadForOutpaint + VAEEncodeForInpaint)",
- "inpaint_anythingv3_woman": "SD1.5 Inpaint AnythingV3",
- "yosemite_outpaint_example": "SD1.5 Outpaint Yosemite 扩展画布",
- # --- lora ---
- "lora": "SD1.5 单 LoRA (LoraLoader)",
- "lora_multiple": "SD1.5 多 LoRA 链式堆叠",
- # --- upscale ---
- "esrgan_example": "ESRGAN 超分辨率 (UpscaleModelLoader + ImageUpscaleWithModel)",
- # --- area_composition ---
- "square_area_for_subject": "区域化构图 (ConditioningSetArea)",
- "workflow_night_evening_day_morning": "四时段区域化构图",
- # --- others ---
- "aura_flow_0.1_example": "AuraFlow 0.1 txt2img",
- "aura_flow_0.2_example": "AuraFlow 0.2 txt2img",
- "chroma_example": "Chroma 模型 txt2img",
- "cosmos_predict2_2b_t2i_example":"Cosmos Predict2 2B txt2img",
- "sdxl_edit_model": "SDXL Edit Model (InstructPixToPixConditioning)",
- "gligen_textbox_example": "GLIGEN 文本框定位 (GLIGENTextBoxApply)",
- "hidream_dev_example": "HiDream Dev txt2img",
- "hidream_e1.1_example": "HiDream E1.1 txt2img",
- "hidream_full_example": "HiDream Full txt2img",
- "hunyuan_dit_1.2_example": "HunyuanDiT 1.2 txt2img",
- "hunyuan_image_example": "Hunyuan Image txt2img",
- "hypernetwork_example": "Hypernetwork 示例",
- "lcm_basic_example": "LCM 快速采样",
- "lumina2_basic_example": "Lumina2 txt2img",
- "model_merging_basic": "模型合并 基础 (ModelMergeSimple)",
- "model_merging_3_checkpoints": "模型合并 三模型",
- "model_merging_cosxl": "模型合并 CosXL",
- "model_merging_inpaint": "模型合并 Inpaint",
- "model_merging_lora": "模型合并 LoRA",
- "noisy_latents_3_subjects": "噪声潜空间构图 三主体",
- "noisy_latents_3_subjects_": "噪声潜空间构图 三主体 (变体)",
- "omnigen2_example": "OmniGen2 txt2img",
- "qwen_image_basic_example": "Qwen Image 基础 txt2img",
- "qwen_image_edit_basic_example": "Qwen Image Edit 基础编辑",
- "qwen_image_edit_2509_basic_example": "Qwen Image Edit 2509 编辑",
- "sdxlturbo_example": "SDXL Turbo 快速采样",
- "stable_cascade__text_to_image": "Stable Cascade txt2img",
- "stable_cascade__image_to_image":"Stable Cascade img2img",
- "stable_cascade__canny_controlnet":"Stable Cascade Canny ControlNet",
- "stable_cascade__inpaint_controlnet":"Stable Cascade Inpaint ControlNet",
- "stable_cascade__image_remixing":"Stable Cascade 图像混合",
- "stable_cascade__image_remixing_multiple":"Stable Cascade 多图混合",
- "embedding_example": "Textual Inversion (embedding) 示例",
- "unclip_example": "UnCLIP 基础",
- "unclip_2pass": "UnCLIP 两阶段",
- "unclip_example_multiple": "UnCLIP 多图输入",
- "z_image_turbo_example": "Z-Image Turbo txt2img",
- }
- class RunComfySchemaInspector:
- def __init__(self, server_url=None):
- self.server_url = server_url
- self.object_info = {}
- # We store a backup copy inside this directory in case the cloud machine is sleep/offline.
- self.cache_path = os.path.join(os.path.dirname(__file__), "object_info_cache.json")
- self._examples_dir = self._locate_examples_dir()
- self._hot_reload()
- # ------------------------------------------------------------------
- # Private helpers
- # ------------------------------------------------------------------
- @staticmethod
- def _locate_examples_dir() -> str:
- """Walk up from this file to find the project root containing `data/comfyui_examples`."""
- anchor = Path(__file__).resolve()
- for parent in [anchor] + list(anchor.parents):
- candidate = parent / "data" / "comfyui_examples"
- if candidate.is_dir():
- return str(candidate)
- # Fallback: assume standard repo layout (this file is 3 levels deep under repo root)
- fallback = Path(__file__).resolve().parents[3] / "data" / "comfyui_examples"
- return str(fallback)
- def _hot_reload(self):
- """Attempts to fetch fresh object_info from the server. Falls back to cached JSON if offline."""
- if not self.server_url:
- # Fallback to standard generic environment if none provided
- # Find an active server if possible, or use the dedicated testing ID
- self.server_url = "https://90f77137-ba75-400d-870f-204c614ae8a3-comfyui.runcomfy.com"
- print(f"[SchemaInspector] Attempting hot-reload from {self.server_url}/object_info...")
- try:
- resp = requests.get(f"{self.server_url}/object_info", timeout=10)
- if resp.status_code == 200:
- self.object_info = resp.json()
- # Update local cache
- with open(self.cache_path, "w", encoding="utf-8") as f:
- json.dump(self.object_info, f)
- print("[SchemaInspector] Successfully updated schema from remote server.")
- return
- except Exception as e:
- print(f"[SchemaInspector] Warning: Hot-reload failed ({e}). Machine might be offline.")
- if os.path.exists(self.cache_path):
- print("[SchemaInspector] Loading schema from local cache.")
- with open(self.cache_path, "r", encoding="utf-8") as f:
- self.object_info = json.load(f)
- else:
- warnings.warn("No active remote connection and no local cache found! Some tools will fail.")
- # ------------------------------------------------------------------
- # Schema / Model inspection (existing)
- # ------------------------------------------------------------------
- def get_node_schema(self, class_type: str) -> dict:
- """Returns the rigorous Required and Optional properties for a specific ComfyUI Node."""
- if class_type not in self.object_info:
- return {"error": f"Node '{class_type}' not found in the environment registry."}
-
- node_def = self.object_info[class_type]
- schema = {
- "name": class_type,
- "inputs": {
- "required": node_def.get("input", {}).get("required", {}),
- "optional": node_def.get("input", {}).get("optional", {})
- },
- "outputs": node_def.get("output_name", [])
- }
- return schema
- def search_models(self, category: str = "checkpoints", keyword: str = "") -> list:
- """
- category: 'checkpoints', 'loras', 'vaes', 'controlnets'
- """
- target_keys = {
- "checkpoints": ("CheckpointLoaderSimple", "ckpt_name"),
- "loras": ("LoraLoader", "lora_name"),
- "vaes": ("VAELoader", "vae_name"),
- "controlnets": ("ControlNetLoader", "control_net_name")
- }
-
- if category not in target_keys:
- return []
-
- node_type, prop = target_keys[category]
- if node_type not in self.object_info:
- return []
-
- try:
- items = self.object_info[node_type]["input"]["required"][prop][0]
- kw = keyword.lower()
- return [x for x in items if kw in x.lower()]
- except (KeyError, IndexError):
- return []
- def verify_workflow(self, api_json: dict) -> dict:
- """
- Validates an LLM-generated API JSON against the dynamic schema.
- Returns a dict containing {"valid": bool, "errors": list_of_strings}
- """
- errors = []
- if not isinstance(api_json, dict):
- return {"valid": False, "errors": ["workflow_api 必须是一个字典对象"]}
-
- for node_id, node in api_json.items():
- ctype = node.get("class_type")
- if not ctype:
- errors.append(f"Node '{node_id}' is missing a class_type.")
- continue
-
- if ctype not in self.object_info:
- errors.append(f"Node '{node_id}' requests non-existent class '{ctype}'.")
- continue
-
- expected_req = self.object_info[ctype].get("input", {}).get("required", {})
- actual_inputs = node.get("inputs", {})
-
- for req_key in expected_req.keys():
- if req_key not in actual_inputs:
- errors.append(f"Node '{node_id}' ({ctype}) is missing REQUIRED input '{req_key}'.")
-
- # 检查连线合法性 (检查断线)
- for input_key, input_value in actual_inputs.items():
- if isinstance(input_value, list) and len(input_value) >= 2:
- target_node = str(input_value[0])
- if target_node not in api_json:
- errors.append(f"Node '{node_id}' ({ctype}) 的 '{input_key}' 连向了不存在的节点: '{target_node}'")
- return {
- "valid": len(errors) == 0,
- "errors": errors
- }
- # ------------------------------------------------------------------
- # Example workflow browsing & loading (NEW)
- # ------------------------------------------------------------------
- def list_example_workflows(self, category: str = None, keyword: str = "") -> dict:
- """
- Browse the built-in example workflow library.
- Args:
- category: Filter by subdirectory name (e.g. 'flux', 'controlnet', 'inpaint').
- Pass None to list ALL categories and their workflows.
- keyword: Optional keyword filter on filename (case-insensitive).
- Returns:
- {
- "examples_dir": str,
- "categories": {
- "flux": [
- {"name": "flux_dev_example", "file": "flux_dev_example_api.json",
- "description": "Flux Dev txt2img ...", "path": "flux/flux_dev_example_api.json"},
- ...
- ],
- ...
- }
- }
- """
- base = Path(self._examples_dir)
- if not base.is_dir():
- return {"error": f"Examples directory not found: {self._examples_dir}"}
- kw = keyword.lower()
- result_categories = {}
- for cat_dir in sorted(base.iterdir()):
- if not cat_dir.is_dir():
- continue
- cat_name = cat_dir.name
- # Category filter
- if category and cat_name != category:
- continue
- entries = []
- for json_file in sorted(cat_dir.glob("*.json")):
- fname = json_file.name
- # strip the trailing _api.json to get the stem
- stem = fname.replace("_api.json", "").replace("_api.", ".")
- # Keyword filter on stem
- if kw and kw not in stem.lower():
- continue
- desc = _WORKFLOW_ANNOTATIONS.get(stem, "")
- entries.append({
- "name": stem,
- "file": fname,
- "description": desc,
- "path": f"{cat_name}/{fname}",
- })
- if entries:
- result_categories[cat_name] = entries
- return {
- "examples_dir": self._examples_dir,
- "categories": result_categories,
- }
- def load_example_workflow(self, name: str) -> dict:
- """
- Load a specific example workflow as a Python dict.
- Args:
- name: Can be any of the following forms:
- - Full relative path: "flux/flux_dev_example_api.json"
- - Stem (auto-resolved): "flux_dev_example"
- - Partial keyword: "flux_dev" (picks first match)
- Returns:
- {"name": str, "path": str, "description": str, "workflow": dict}
- or {"error": str} if not found.
- """
- base = Path(self._examples_dir)
- # Strategy 1: exact relative path
- full_path = base / name
- if full_path.is_file():
- return self._read_example(full_path)
- # Strategy 2: exact stem → category/stem_api.json
- for json_file in base.rglob("*.json"):
- stem = json_file.name.replace("_api.json", "").replace("_api.", ".")
- if stem == name:
- return self._read_example(json_file)
- # Strategy 3: partial keyword match (first hit)
- name_lower = name.lower()
- for json_file in sorted(base.rglob("*.json")):
- if name_lower in json_file.stem.lower():
- return self._read_example(json_file)
- return {"error": f"No example workflow matching '{name}' found in {self._examples_dir}"}
- def _read_example(self, filepath: Path) -> dict:
- """Read a single example JSON and return annotated result."""
- base = Path(self._examples_dir)
- rel = filepath.relative_to(base)
- stem = filepath.name.replace("_api.json", "").replace("_api.", ".")
- desc = _WORKFLOW_ANNOTATIONS.get(stem, "")
- with open(filepath, "r", encoding="utf-8") as f:
- wf = json.load(f)
- # Build a quick summary of nodes used
- node_types = sorted(set(
- n.get("class_type", "?") for n in wf.values() if isinstance(n, dict)
- ))
- return {
- "name": stem,
- "path": str(rel),
- "description": desc,
- "node_types_used": node_types,
- "node_count": len(wf),
- "workflow": wf,
- }
- # ===========================================================================
- # FastAPI 服务层 — 统一 /query 端点,通过 action 参数分发
- # ===========================================================================
- import argparse
- from typing import Any, Optional
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel, Field
- import uvicorn
- app = FastAPI(title="RunComfy Workflow Builder", version="1.0")
- # 懒加载 inspector(首次请求时初始化,避免启动时阻塞)
- _inspector: RunComfySchemaInspector | None = None
- def _get_inspector() -> RunComfySchemaInspector:
- global _inspector
- if _inspector is None:
- _inspector = RunComfySchemaInspector()
- return _inspector
- class QueryRequest(BaseModel):
- action: str = Field(..., description=(
- "要执行的操作: "
- "search_models | get_node_schema | verify_workflow | "
- "list_examples | load_example | read_skill"
- ))
- # 以下参数按 action 使用,不需要的可以不传
- category: Optional[str] = Field(None, description="模型分类(checkpoints/loras/vaes/controlnets) 或示例分类(flux/controlnet/...)")
- keyword: Optional[str] = Field(None, description="搜索关键词")
- class_type: Optional[str] = Field(None, description="ComfyUI 节点类型名 (get_node_schema 用)")
- name: Optional[str] = Field(None, description="示例工作流名称 (load_example 用)")
- workflow: Optional[dict[str, Any]] = Field(None, description="待验证的 API JSON (verify_workflow 用)")
- @app.get("/health")
- def health():
- return {"status": "ok"}
- @app.post("/")
- def query(req: QueryRequest):
- """统一入口 — 根据 action 分发到对应的内部方法。"""
- inspector = _get_inspector()
- # ---------- search_models ----------
- if req.action == "search_models":
- results = inspector.search_models(
- category=req.category or "checkpoints",
- keyword=req.keyword or ""
- )
- return {"action": "search_models", "count": len(results), "models": results}
- # ---------- get_node_schema ----------
- elif req.action == "get_node_schema":
- if not req.class_type:
- raise HTTPException(400, "get_node_schema 需要 class_type 参数")
- schema = inspector.get_node_schema(req.class_type)
- return {"action": "get_node_schema", **schema}
- # ---------- verify_workflow ----------
- elif req.action == "verify_workflow":
- if not req.workflow:
- raise HTTPException(400, "verify_workflow 需要 workflow 参数 (API JSON dict)")
- result = inspector.verify_workflow(req.workflow)
- return {"action": "verify_workflow", **result}
- # ---------- list_examples ----------
- elif req.action == "list_examples":
- result = inspector.list_example_workflows(
- category=req.category,
- keyword=req.keyword or ""
- )
- return {"action": "list_examples", **result}
- # ---------- load_example ----------
- elif req.action == "load_example":
- if not req.name:
- raise HTTPException(400, "load_example 需要 name 参数")
- result = inspector.load_example_workflow(req.name)
- if "error" in result:
- raise HTTPException(404, result["error"])
- return {"action": "load_example", **result}
- # ---------- read_skill ----------
- elif req.action == "read_skill":
- skill_path = Path(__file__).parent / "skill.md"
- if not skill_path.exists():
- raise HTTPException(404, "skill.md not found")
- content = skill_path.read_text(encoding="utf-8")
- return {"action": "read_skill", "content": content}
- else:
- raise HTTPException(400, (
- f"未知 action: '{req.action}'。"
- "支持: search_models, get_node_schema, verify_workflow, "
- "list_examples, load_example, read_skill"
- ))
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--port", type=int, default=8010)
- args = parser.parse_args()
- uvicorn.run(app, host="0.0.0.0", port=args.port)
|