#!/usr/bin/env python3 """LibLibAI ComfyUI Workflow Runner 用法: python liblibai_comfyui_runner.py workflow.json python liblibai_comfyui_runner.py workflow.json --output result.png """ import argparse import base64 import hashlib import hmac import json import os import sys import time import uuid from pathlib import Path import requests from dotenv import load_dotenv load_dotenv() DOMAIN = os.getenv("LIBLIBAI_DOMAIN", "https://openapi.liblibai.cloud") ACCESS_KEY = os.getenv("LIBLIBAI_ACCESS_KEY") SECRET_KEY = os.getenv("LIBLIBAI_SECRET_KEY") DEFAULT_TEMPLATE = "4df2efa0f18d46dc9758803e478eb51c" def generate_auth_url(uri: str) -> str: """生成带 HMAC-SHA1 签名的完整 URL""" ts = str(int(time.time() * 1000)) nonce = uuid.uuid4().hex sign_str = f"{uri}&{ts}&{nonce}" dig = hmac.new(SECRET_KEY.encode(), sign_str.encode(), hashlib.sha1).digest() signature = base64.urlsafe_b64encode(dig).rstrip(b"=").decode() return f"{DOMAIN}{uri}?AccessKey={ACCESS_KEY}&Timestamp={ts}&SignatureNonce={nonce}&Signature={signature}" def submit_workflow(workflow_data: dict) -> str: """提交 ComfyUI workflow 生图任务""" uri = "/api/generate/comfyui/app" url = generate_auth_url(uri) payload = { "templateUuid": DEFAULT_TEMPLATE, "generateParams": workflow_data } print(f"提交任务...") resp = requests.post(url, json=payload, headers={"Content-Type": "application/json"}) resp.raise_for_status() data = resp.json() if data.get("code") != 0: raise Exception(f"提交失败: {data.get('msg', 'unknown error')}") generate_uuid = data["data"]["generateUuid"] print(f"任务 ID: {generate_uuid}") return generate_uuid def poll_result(generate_uuid: str, timeout: int = 600) -> dict: """轮询任务结果""" uri = "/api/generate/comfy/status" url = generate_auth_url(uri) start_time = time.time() print(f"轮询结果 (超时 {timeout}s)...") while time.time() - start_time < timeout: resp = requests.post(url, json={"generateUuid": generate_uuid}) resp.raise_for_status() data = resp.json() if data.get("code") != 0: raise Exception(f"查询失败: {data.get('msg', 'unknown error')}") result = data["data"] status = result["generateStatus"] # 1=等待 2=执行中 3=已生图 4=审核中 5=成功 6=失败 if status in [1, 2, 3, 4]: status_text = {1: "等待", 2: "执行中", 3: "已生图", 4: "审核中"}[status] print(f" 状态: {status_text} 进度: {result.get('percentCompleted', 0):.0%}") time.sleep(5) continue if status == 5: print(f"✓ 任务成功") print(f" 消耗积分: {result.get('pointsCost', 0)}") print(f" 剩余积分: {result.get('accountBalance', 0)}") return result if status == 6: raise Exception(f"任务失败: {result.get('generateMsg', 'unknown')}") raise TimeoutError(f"轮询超时 ({timeout}s)") def save_results(result: dict, output_dir: Path): """保存生成的图片和视频""" output_dir.mkdir(parents=True, exist_ok=True) images = result.get("images", []) videos = result.get("videos", []) print(f"\n生成结果:") print(f" 图片: {len(images)} 张") print(f" 视频: {len(videos)} 个") for i, img in enumerate(images): if img.get("auditStatus") != 3: print(f" 图片 {i+1}: 审核未通过 (status={img.get('auditStatus')})") continue url = img["imageUrl"] filename = f"image_{i+1}.png" filepath = output_dir / filename print(f" 下载: {filename}") resp = requests.get(url) resp.raise_for_status() filepath.write_bytes(resp.content) print(f" -> {filepath}") for i, vid in enumerate(videos): if vid.get("auditStatus") != 3: print(f" 视频 {i+1}: 审核未通过 (status={vid.get('auditStatus')})") continue url = vid["videoUrl"] filename = f"video_{i+1}.mp4" filepath = output_dir / filename print(f" 下载: {filename}") resp = requests.get(url) resp.raise_for_status() filepath.write_bytes(resp.content) print(f" -> {filepath}") def main(): parser = argparse.ArgumentParser(description="LibLibAI ComfyUI Workflow Runner") parser.add_argument("workflow", help="workflow JSON 文件路径") parser.add_argument("--output", "-o", default="output", help="输出目录,默认 output/") parser.add_argument("--timeout", "-t", type=int, default=600, help="轮询超时秒数,默认 600") args = parser.parse_args() if not ACCESS_KEY or not SECRET_KEY: print("ERROR: 请设置环境变量 LIBLIBAI_ACCESS_KEY 和 LIBLIBAI_SECRET_KEY") sys.exit(1) workflow_path = Path(args.workflow) if not workflow_path.exists(): print(f"ERROR: 文件不存在: {workflow_path}") sys.exit(1) print(f"加载 workflow: {workflow_path}") with open(workflow_path, "r", encoding="utf-8") as f: workflow_data = json.load(f) try: generate_uuid = submit_workflow(workflow_data) result = poll_result(generate_uuid, timeout=args.timeout) save_results(result, Path(args.output)) print("\n✓ 完成") except Exception as e: print(f"\n✗ 错误: {e}") sys.exit(1) if __name__ == "__main__": main()