| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- """批量触发生成三个 RunComfy 原子化工具
- 用法:
- uv run python tests/test_create_runcomfy_atomic.py
- """
- import sys
- import time
- from pathlib import Path
- import httpx
- BASE_URL = "http://127.0.0.1:8001"
- TASKS_DIR = Path(__file__).parent / "tasks"
- # 我们刚才写的三个原子化任务书
- TASKS = [
- "runcomfy_launch_env",
- "runcomfy_run_only",
- "runcomfy_stop_env"
- ]
- def check_connection():
- try:
- httpx.get(f"{BASE_URL}/health", timeout=3)
- except httpx.ConnectError:
- print(f"ERROR: Cannot connect to {BASE_URL}")
- print("Please start the service first:")
- print(" uv run python -m tool_agent")
- sys.exit(1)
- def submit_task(task_name: str) -> str:
- task_file = TASKS_DIR / f"{task_name}.json"
- if not task_file.exists():
- print(f"ERROR: Task file not found: {task_file}")
- sys.exit(1)
- import json
- with open(task_file, "r", encoding="utf-8") as f:
- task_data = json.load(f)
- print(f"\n[{task_name}] Submitting...")
- resp = httpx.post(f"{BASE_URL}/create_tool", json=task_data, timeout=30)
- resp.raise_for_status()
- data = resp.json()
- task_id = data["task_id"]
- print(f"[{task_name}] Task ID: {task_id}")
- return task_id
- def poll_tasks(task_ids: dict[str, str], timeout: int = 900):
- print("\n=== Polling Tasks ===")
- pending = set(task_ids.values())
- interval = 10
- steps = timeout // interval
- for i in range(steps):
- if not pending:
- print("\nAll tasks finished!")
- break
- time.sleep(interval)
- elapsed = (i + 1) * interval
- for task_name, task_id in list(task_ids.items()):
- if task_id not in pending:
- continue
- resp = httpx.get(f"{BASE_URL}/tasks/{task_id}", timeout=30)
- status = resp.json()["status"]
- if status in ("completed", "failed"):
- print(f"\n[{task_name}] Finished with status: {status}")
- pending.remove(task_id)
- elif elapsed % 30 == 0:
- print(f"[{elapsed}s] {task_name}: {status}")
- if pending:
- print(f"\nTimeout! Still pending: {pending}")
- def main():
- check_connection()
- # 1. 批量提交
- task_ids = {}
- for task_name in TASKS:
- task_id = submit_task(task_name)
- task_ids[task_name] = task_id
- # 2. 并行轮询
- poll_tasks(task_ids)
- if __name__ == "__main__":
- main()
|