| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import os
- import sys
- import json
- import time
- import requests
- import asyncio
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
- from agent.tools.builtin.toolhub import _preprocess_params
- ROUTER_URL = "http://127.0.0.1:8001/run_tool"
- def call_tool(tool_id, params):
- print(f"\n[{tool_id}] Calling...")
- # 就像真正的 agent 流程一样,先预处理参数(将本地文件转化为 CDN 链接)
- processed_params = asyncio.run(_preprocess_params(params))
-
- resp = requests.post(ROUTER_URL, json={"tool_id": tool_id, "params": processed_params})
- resp.raise_for_status()
- res = resp.json()
- if res.get("status") == "error":
- raise Exception(f"Tool error: {res.get('error')}")
- print(f"[{tool_id}] Success")
- return res.get("result", {})
- def test_full_workflow():
- print("=== RunComfy Workflow E2E Test ===")
-
- # 0. Check existing machines
- print("\n0. Checking for existing ComfyUI Instances...")
- status_res = call_tool("runcomfy_check_status", {})
- servers = status_res.get("servers", [])
-
- server_id = None
- for s in servers:
- if s.get("current_status") == "Ready":
- server_id = s.get("server_id")
- print(f"Found Ready machine: {server_id}")
- break
- elif s.get("current_status") in ("Starting", "Creating") and not server_id:
- server_id = s.get("server_id")
- print(f"Found Starting machine: {server_id}. Note: You might need to wait for it to be Ready.")
- if not server_id:
- raise Exception("\nNo active machine found. Expecting an already running machine. Aborting.")
-
- print("\n1. Proceeding with existing machine...")
- try:
- print("\n2. Loading custom workflow from JSON...")
- import os
- workflow_path = os.path.join(os.path.dirname(__file__), "flux_depth_controlnet_workflow.json")
- with open(workflow_path, "r", encoding="utf-8") as f:
- workflow_api = json.load(f)
- print("\n3. Executing workflow with ControlNet inputs...")
-
- # 收集输入图片文件列表传递给 executor
- # 我们在这里填入包含本地路径的 url
- # call_tool 会使用 ToolHub 的 _preprocess_params 自动把这些 url 的本地路径替换为真正的 CDN 链接
- input_dir = os.path.join(os.path.dirname(__file__), "input")
- input_files = [
- {
- "filename": "depth_map.png",
- "type": "images",
- "url": os.path.join(input_dir, "depth_map.png")
- },
- {
- "filename": "background_bokeh_img2.png",
- "type": "images",
- "url": os.path.join(input_dir, "background_bokeh_img2.png")
- },
- {
- "filename": "character_ref_back.png",
- "type": "images",
- "url": os.path.join(input_dir, "character_ref_back.png")
- },
- {
- "filename": "easel_blank_canvas_img4.png",
- "type": "images",
- "url": os.path.join(input_dir, "easel_blank_canvas_img4.png")
- }
- ]
- exec_res = call_tool("runcomfy_workflow_executor", {
- "server_id": server_id,
- "workflow_api": workflow_api,
- "input_files": input_files
- })
-
- print("Execution finished! Prompt ID:", exec_res.get("prompt_id"))
-
- images = exec_res.get("images", [])
- print("Generated images count:", len(images))
- if images:
- # 拿到的是 base64 或者是 CDN url
- img_data = images[0]
- if isinstance(img_data, dict):
- print("First image info:", img_data.get("url") or (img_data.get("data", "")[:50] + "..."))
- else:
- print("First image info (raw):", img_data[:50] + "...")
-
- # 这是一个 base64 编码的 PNG,我们直接把它保存到 out/ 目录
- import base64
- output_dir = os.path.join(os.path.dirname(__file__), "output")
- os.makedirs(output_dir, exist_ok=True)
- output_path = os.path.join(output_dir, "test_output.png")
-
- try:
- with open(output_path, "wb") as fh:
- fh.write(base64.b64decode(img_data))
- print(f"🎉 成功!生成的图片已保存至: {output_path}")
- except Exception as e:
- print(f"Failed to save image: {e}")
-
- finally:
- # 4. Cleanup
- print("\n4. Keep machine alive (Skip cleanup so you can check runcomfy)")
- # 暂时跳过 stop 方便排查
- # call_tool("runcomfy_stop_env", {"server_id": server_id})
- if __name__ == "__main__":
- test_full_workflow()
|