| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- """测试 BFL FLUX 异步生图 — 通过 Router POST /run_tool
- 官方流程:先 POST 提交任务拿到 id + polling_url,再轮询 polling_url 直至 Ready。
- 文档: https://docs.bfl.ai/quick_start/generating_images
- 用法:
- 1. 配置 tools/local/flux/.env:BFL_API_KEY
- 2. uv run python -m tool_agent
- 3. uv run python tests/test_flux.py
- 模型切换:
- FLUX_TEST_MODEL=flux-2-max uv run python tests/test_flux.py
- (model 为路径段,如 flux-2-pro-preview、flux-2-pro、flux-dev 等,见官方 Available Endpoints)
- 环境变量:
- TOOL_AGENT_ROUTER_URL 默认 http://127.0.0.1:8001
- FLUX_SUBMIT_TOOL_ID 默认 flux_submit
- FLUX_QUERY_TOOL_ID 默认 flux_query
- FLUX_TEST_MODEL 默认 flux-2-pro-preview
- FLUX_TEST_PROMPT 覆盖默认短提示词
- FLUX_POLL_INTERVAL_S 默认 1.0
- FLUX_POLL_MAX_WAIT_S 默认 300
- """
- from __future__ import annotations
- import io
- import os
- import sys
- import time
- from typing import Any
- if sys.platform == "win32":
- _out = sys.stdout
- if isinstance(_out, io.TextIOWrapper):
- _out.reconfigure(encoding="utf-8")
- import httpx
- ROUTER_URL = os.environ.get("TOOL_AGENT_ROUTER_URL", "http://127.0.0.1:8001")
- SUBMIT_TOOL = os.environ.get("FLUX_SUBMIT_TOOL_ID", "flux_submit")
- QUERY_TOOL = os.environ.get("FLUX_QUERY_TOOL_ID", "flux_query")
- FLUX_MODEL = os.environ.get("FLUX_TEST_MODEL", "flux-2-pro-preview").strip()
- TEST_PROMPT = os.environ.get(
- "FLUX_TEST_PROMPT",
- "A tiny red apple on white background, simple product photo, minimal",
- )
- POLL_INTERVAL_S = float(os.environ.get("FLUX_POLL_INTERVAL_S", "1.0"))
- POLL_MAX_WAIT_S = float(os.environ.get("FLUX_POLL_MAX_WAIT_S", "300"))
- def run_tool(tool_id: str, params: dict[str, Any], timeout: float = 120.0) -> dict[str, Any]:
- resp = httpx.post(
- f"{ROUTER_URL}/run_tool",
- json={"tool_id": tool_id, "params": params},
- timeout=timeout,
- )
- resp.raise_for_status()
- body = resp.json()
- if body.get("status") != "success":
- raise RuntimeError(body.get("error") or str(body))
- result = body.get("result")
- if isinstance(result, dict) and result.get("status") == "error":
- raise RuntimeError(result.get("error", str(result)))
- return result if isinstance(result, dict) else {}
- def _poll_terminal_success(data: dict[str, Any]) -> bool:
- s = str(data.get("status") or "").strip()
- return s.lower() == "ready"
- def _poll_terminal_failure(data: dict[str, Any]) -> bool:
- s = str(data.get("status") or "").strip().lower()
- return s in ("error", "failed")
- def _sample_url(data: dict[str, Any]) -> str | None:
- r = data.get("result")
- if isinstance(r, dict):
- u = r.get("sample")
- if isinstance(u, str) and u.startswith("http"):
- return u
- return None
- def main() -> None:
- print("=" * 50)
- print("测试 FLUX(BFL 异步 API + 模型可切换)")
- print("=" * 50)
- print(f"ROUTER_URL: {ROUTER_URL}")
- print(f"model: {FLUX_MODEL}")
- try:
- r = httpx.get(f"{ROUTER_URL}/health", timeout=3)
- print(f"Router 状态: {r.json()}")
- except httpx.ConnectError:
- print(f"无法连接 Router ({ROUTER_URL}),请先: uv run python -m tool_agent")
- sys.exit(1)
- print("\n--- 校验工具已注册 ---")
- tr = httpx.get(f"{ROUTER_URL}/tools", timeout=30)
- tr.raise_for_status()
- tools = tr.json().get("tools", [])
- ids = {t["tool_id"] for t in tools}
- for tid in (SUBMIT_TOOL, QUERY_TOOL):
- if tid not in ids:
- print(f"错误: {tid!r} 不在 GET /tools 中。示例 id: {sorted(ids)[:20]}...")
- sys.exit(1)
- meta = next(t for t in tools if t["tool_id"] == tid)
- print(f" {tid}: {meta.get('name', '')} (state={meta.get('state')})")
- props = (next(t for t in tools if t["tool_id"] == SUBMIT_TOOL).get("input_schema") or {}).get(
- "properties"
- ) or {}
- if "model" in props:
- print(" flux_submit input_schema 已声明 model")
- else:
- print(" 提示: flux_submit 宜在注册表中声明 model 以便切换端点")
- print("\n--- flux_submit ---")
- submit_params: dict[str, Any] = {
- "model": FLUX_MODEL,
- "prompt": TEST_PROMPT,
- "width": 512,
- "height": 512,
- }
- try:
- sub = run_tool(SUBMIT_TOOL, submit_params, timeout=120.0)
- except (RuntimeError, httpx.HTTPError) as e:
- print(f"错误: {e}")
- sys.exit(1)
- print(f"提交返回 keys: {list(sub.keys())}")
- req_id = sub.get("id") or sub.get("request_id")
- poll_url = sub.get("polling_url")
- if not req_id or not poll_url:
- print(f"错误: 缺少 id 或 polling_url: {sub}")
- sys.exit(1)
- print(f"request id: {req_id}")
- print(f"polling_url: {poll_url[:80]}...")
- print("\n--- flux_query 轮询 ---")
- deadline = time.monotonic() + POLL_MAX_WAIT_S
- last: dict[str, Any] = {}
- while time.monotonic() < deadline:
- time.sleep(POLL_INTERVAL_S)
- try:
- last = run_tool(
- QUERY_TOOL,
- {"polling_url": str(poll_url), "request_id": str(req_id)},
- timeout=60.0,
- )
- except (RuntimeError, httpx.HTTPError) as e:
- print(f"轮询错误: {e}")
- sys.exit(1)
- st = last.get("status")
- print(f" status: {st}")
- if _poll_terminal_failure(last):
- print(f"生成失败: {last}")
- sys.exit(1)
- if _poll_terminal_success(last):
- url = _sample_url(last)
- if url:
- print(f"\n图片 URL(signed,约 10 分钟内有效): {url[:100]}...")
- print("\n测试通过!")
- return
- print(f"\n等待超时 ({POLL_MAX_WAIT_S}s),最后一次: {last}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|