import os import sys import json import time import requests import asyncio # 添加项目根目录到 Python 路径 sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) from agent.tools.builtin.toolhub import _preprocess_params ROUTER_URL = "http://127.0.0.1:8001/run_tool" def call_tool(tool_id, params): print(f"\n[{tool_id}] Calling...") # 就像真正的 agent 流程一样,先预处理参数(将本地文件转化为 CDN 链接) processed_params = asyncio.run(_preprocess_params(params)) resp = requests.post(ROUTER_URL, json={"tool_id": tool_id, "params": processed_params}) resp.raise_for_status() res = resp.json() if res.get("status") == "error": raise Exception(f"Tool error: {res.get('error')}") print(f"[{tool_id}] Success") return res.get("result", {}) def test_full_workflow(): print("=== RunComfy Workflow E2E Test ===") # 0. Check existing machines print("\n0. Checking for existing ComfyUI Instances...") status_res = call_tool("runcomfy_check_status", {}) servers = status_res.get("servers", []) server_id = None for s in servers: if s.get("current_status") == "Ready": server_id = s.get("server_id") print(f"Found Ready machine: {server_id}") break elif s.get("current_status") in ("Starting", "Creating") and not server_id: server_id = s.get("server_id") print(f"Found Starting machine: {server_id}. Note: You might need to wait for it to be Ready.") if not server_id: raise Exception("\nNo active machine found. Expecting an already running machine. Aborting.") print("\n1. Proceeding with existing machine...") try: print("\n2. Loading custom workflow from JSON...") import os workflow_path = os.path.join(os.path.dirname(__file__), "flux_depth_controlnet_workflow.json") with open(workflow_path, "r", encoding="utf-8") as f: workflow_api = json.load(f) print("\n3. Injecting correct prompt to fix ControlNet mismatch...") for node_id, node in workflow_api.items(): if node.get("class_type") == "CLIPTextEncode": current_text = node["inputs"].get("text", "") if "landscape" in current_text: new_prompt = ( "A back-view of a person standing in front of a wooden painting easel, " "looking out at a beautiful landscape with majestic mountains and a serene cyan lake. " "Highly detailed, masterpiece, realistic photography, cinematic lighting, 8k resolution" ) node["inputs"]["text"] = new_prompt print(f" -> Successfully updated prompt in node {node_id}") print("\n4. Executing workflow with ControlNet inputs...") # 收集输入图片文件列表传递给 executor # 我们在这里填入包含本地路径的 url # call_tool 会使用 ToolHub 的 _preprocess_params 自动把这些 url 的本地路径替换为真正的 CDN 链接 input_dir = os.path.join(os.path.dirname(__file__), "input") input_files = [ { "filename": "depth_map.png", "type": "images", "url": os.path.join(input_dir, "depth_map.png") }, { "filename": "background_bokeh_img2.png", "type": "images", "url": os.path.join(input_dir, "background_bokeh_img2.png") }, { "filename": "character_ref_back.png", "type": "images", "url": os.path.join(input_dir, "character_ref_back.png") }, { "filename": "easel_blank_canvas_img4.png", "type": "images", "url": os.path.join(input_dir, "easel_blank_canvas_img4.png") } ] exec_res = call_tool("runcomfy_workflow_executor", { "server_id": server_id, "workflow_api": workflow_api, "input_files": input_files }) print("Execution finished! Prompt ID:", exec_res.get("prompt_id")) images = exec_res.get("images", []) print("Generated images count:", len(images)) if images: # 拿到的是 base64 或者是 CDN url img_data = images[0] if isinstance(img_data, dict): print("First image info:", img_data.get("url") or (img_data.get("data", "")[:50] + "...")) else: print("First image info (raw):", img_data[:50] + "...") output_dir = os.path.join(os.path.dirname(__file__), "output") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, "test_output.png") try: if img_data.startswith("http"): import requests img_resp = requests.get(img_data) img_resp.raise_for_status() with open(output_path, "wb") as fh: fh.write(img_resp.content) else: import base64 with open(output_path, "wb") as fh: fh.write(base64.b64decode(img_data)) print(f"🎉 成功!生成的图片已保存至: {output_path}") except Exception as e: print(f"Failed to save image: {e}") finally: # 4. Cleanup print("\n4. Keep machine alive (Skip cleanup so you can check runcomfy)") # 暂时跳过 stop 方便排查 # call_tool("runcomfy_stop_env", {"server_id": server_id}) if __name__ == "__main__": test_full_workflow() os._exit(0)