"""批量触发生成三个 RunComfy 原子化工具 用法: uv run python tests/test_create_runcomfy_atomic.py """ import sys import time from pathlib import Path import httpx BASE_URL = "http://127.0.0.1:8001" TASKS_DIR = Path(__file__).parent / "tasks" # 我们刚才写的三个原子化任务书 TASKS = [ "runcomfy_launch_env", "runcomfy_run_only", "runcomfy_stop_env" ] def check_connection(): try: httpx.get(f"{BASE_URL}/health", timeout=3) except httpx.ConnectError: print(f"ERROR: Cannot connect to {BASE_URL}") print("Please start the service first:") print(" uv run python -m tool_agent") sys.exit(1) def submit_task(task_name: str) -> str: task_file = TASKS_DIR / f"{task_name}.json" if not task_file.exists(): print(f"ERROR: Task file not found: {task_file}") sys.exit(1) import json with open(task_file, "r", encoding="utf-8") as f: task_data = json.load(f) print(f"\n[{task_name}] Submitting...") resp = httpx.post(f"{BASE_URL}/create_tool", json=task_data, timeout=30) resp.raise_for_status() data = resp.json() task_id = data["task_id"] print(f"[{task_name}] Task ID: {task_id}") return task_id def poll_tasks(task_ids: dict[str, str], timeout: int = 900): print("\n=== Polling Tasks ===") pending = set(task_ids.values()) interval = 10 steps = timeout // interval for i in range(steps): if not pending: print("\nAll tasks finished!") break time.sleep(interval) elapsed = (i + 1) * interval for task_name, task_id in list(task_ids.items()): if task_id not in pending: continue resp = httpx.get(f"{BASE_URL}/tasks/{task_id}", timeout=30) status = resp.json()["status"] if status in ("completed", "failed"): print(f"\n[{task_name}] Finished with status: {status}") pending.remove(task_id) elif elapsed % 30 == 0: print(f"[{elapsed}s] {task_name}: {status}") if pending: print(f"\nTimeout! Still pending: {pending}") def main(): check_connection() # 1. 批量提交 task_ids = {} for task_name in TASKS: task_id = submit_task(task_name) task_ids[task_name] = task_id # 2. 并行轮询 poll_tasks(task_ids) if __name__ == "__main__": main()