| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- import os
- import hmac
- import hashlib
- import base64
- import uuid
- import time
- import requests
- import json
- from typing import Optional, Dict, Any, List
- from dotenv import load_dotenv
- load_dotenv()
- TEMPLATE_UUID = "e10adc3949ba59abbe56e057f20f883e"
- INSTANT_ID_TEMPLATE_UUID = "7d888009f81d4252a7c458c874cd017f"
- CHECKPOINT_ID = os.getenv("LIBLIBAI_DEFAULT_MODEL", "0ea388c7eb854be3ba3c6f65aac6bfd3")
- class LibLibAIClient:
- def __init__(self):
- self.access_key = os.getenv("LIBLIBAI_ACCESS_KEY")
- self.secret_key = os.getenv("LIBLIBAI_SECRET_KEY")
- self.domain = os.getenv("LIBLIBAI_DOMAIN", "https://openapi.liblibai.cloud")
- if not self.access_key or not self.secret_key:
- raise ValueError("Missing LIBLIBAI_ACCESS_KEY or LIBLIBAI_SECRET_KEY")
-
- self.models = self._load_models_from_json()
- self.sdxl_canny = self._get_model_uuid("线稿类", "Canny(硬边缘)") or "b6806516962f4e1599a93ac4483c3d23"
- self.sdxl_softedge = self._get_model_uuid("线稿类", "SoftEdge(软边缘)") or "dda1a0c480bfab9833d9d9a1e4a71fff"
- self.sdxl_lineart = self._get_model_uuid("线稿类", "Lineart(线稿)") or "a0f01da42bf48b0ba02c86b6c26b5699"
- self.sdxl_openpose = self._get_model_uuid("姿态类", "OpenPose(姿态)") or "2fe4f992a81c5ccbdf8e9851c8c96ff2"
- self.sdxl_depth = "6349e9dae8814084bd9c1585d335c24c"
- def _load_models_from_json(self):
- base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
- json_path = os.path.join(base_dir, "data", "liblibai_controlnet_models.json")
- if os.path.exists(json_path):
- with open(json_path, "r", encoding="utf-8") as f:
- return json.load(f)
- return {}
- def _get_model_uuid(self, category, subtype, xl_only=True):
- if category in self.models and subtype in self.models[category]:
- for model in self.models[category][subtype]:
- if xl_only and model["base_algorithm"] != "基础算法 XL":
- continue
- return model["uuid"]
- return None
- def generate_auth_url(self, uri: str) -> str:
- ts = str(int(time.time() * 1000))
- nonce = uuid.uuid4().hex
- sign_str = f"{uri}&{ts}&{nonce}"
- dig = hmac.new(self.secret_key.encode(), sign_str.encode(), hashlib.sha1).digest()
- signature = base64.urlsafe_b64encode(dig).rstrip(b"=").decode()
- return f"{self.domain}{uri}?AccessKey={self.access_key}&Timestamp={ts}&SignatureNonce={nonce}&Signature={signature}"
-
- def get_upload_signature(self, filename: str = "image.png") -> Dict[str, Any]:
- uri = "/api/generate/upload/signature"
- url = self.generate_auth_url(uri)
- extension = filename.split(".")[-1] if "." in filename else "png"
- payload = {"name": filename, "extension": extension}
- resp = requests.post(url, json=payload)
- resp.raise_for_status()
- data = resp.json()
- if data.get("code") != 0:
- raise Exception(f"Get upload signature failed: {data}")
- return data["data"]
-
- def upload_image_to_oss(self, image_bytes: bytes, sig_data: Dict[str, Any]) -> str:
- data = {"key": sig_data["key"], "policy": sig_data["policy"], "x-oss-date": sig_data["xOssDate"],
- "x-oss-expires": sig_data["xOssExpires"], "x-oss-signature": sig_data["xOssSignature"],
- "x-oss-credential": sig_data["xOssCredential"], "x-oss-signature-version": sig_data["xOssSignatureVersion"]}
- files = {"file": ("image.png", image_bytes, "image/png")}
- resp = requests.post(sig_data["postUrl"], data=data, files=files)
- if resp.status_code != 204:
- raise Exception(f"Upload to OSS failed: {resp.status_code} {resp.text}")
- return f"{sig_data['postUrl']}/{sig_data['key']}"
-
- def upload_base64_image(self, base64_data: str) -> str:
- if base64_data.startswith("data:image"):
- base64_data = base64_data.split(",", 1)[1]
- image_bytes = base64.b64decode(base64_data)
- sig_data = self.get_upload_signature()
- return self.upload_image_to_oss(image_bytes, sig_data)
-
- def process_image_url(self, image: str) -> str:
- if image.startswith("http://") or image.startswith("https://"):
- # 如果本身就是 liblib 的图床,直接短路返回
- if "liblib" in image.lower() or "aliyuncs.com" in image.lower():
- return image
-
- # 否则必须先将外部公网 URL 拽下来,转存到 LibLib 的 OSS 中
- import httpx
- try:
- resp = httpx.get(image, timeout=30.0)
- resp.raise_for_status()
- sig_data = self.get_upload_signature()
- return self.upload_image_to_oss(resp.content, sig_data)
- except Exception as e:
- raise ValueError(f"Failed to fetch and upload external image URL: {e}")
-
- return self.upload_base64_image(image)
- def submit_task_payload(self, payload: dict) -> str:
- uri = "/api/generate/webui/text2img"
- url = self.generate_auth_url(uri)
- resp = requests.post(url, json=payload, timeout=15)
- data = resp.json()
- if data.get("code") != 0:
- raise Exception(f"Submit task failed: {data.get('msg', data)} (code: {data.get('code')})")
- return data["data"]["generateUuid"]
- def query_task_status(self, task_id: str) -> Dict[str, Any]:
- uri = "/api/generate/webui/status"
- url = self.generate_auth_url(uri)
- payload = {"generateUuid": task_id}
- resp = requests.post(url, json=payload)
- resp.raise_for_status()
- data = resp.json()
- if data.get("code") != 0:
- raise Exception(f"Query task failed: {data}")
- return data["data"]
-
- def wait_for_result(self, task_id: str, timeout: int = 300) -> Dict[str, Any]:
- start_time = time.time()
- while time.time() - start_time < timeout:
- task_data = self.query_task_status(task_id)
- status = task_data.get("generateStatus")
- if status == 5:
- images = [img["imageUrl"] for img in task_data.get("images", [])]
- return {"images": images, "task_id": task_id, "status": "success"}
- elif status in [6, 7]:
- return {"images": [], "task_id": task_id, "status": "failed", "detail": f"Status {status}"}
- time.sleep(5)
- return {"images": [], "task_id": task_id, "status": "timeout"}
- def get_model_version_info(self, version_uuid: str) -> dict:
- uri = "/api/model/version/get"
- auth_url = self.generate_auth_url(uri)
- payload = {"versionUuid": version_uuid}
- try:
- resp = requests.post(auth_url, json=payload, timeout=10)
- data = resp.json()
- if data.get("code") == 0:
- return data.get("data", {})
- return {}
- except Exception:
- return {}
- def search_models(self, keyword: str) -> Dict[str, Any]:
- url = "http://crawler.aiddit.com/crawler/liblib/keyword"
- payload = {"keyword": keyword}
- headers = {
- "Content-Type": "application/json",
- "Cookie": "_xsrf=2|c9d0a1bf|891f10d6ea5abc19d58be0d2fac84e6a|1774447752"
- }
- resp = requests.post(url, json=payload, headers=headers, timeout=15)
- resp.raise_for_status()
- return resp.json()
- def get_model_detail(self, content_link: str = None, uuid: str = None, version_uuid: str = None) -> Dict[str, Any]:
- url = "http://crawler.aiddit.com/crawler/liblib/detail"
- if not content_link:
- if not uuid or not version_uuid:
- raise ValueError("Must provide either content_link or uuid and version_uuid")
- content_link = f"https://www.liblib.art/modelinfo/{uuid}?from=search&versionUuid={version_uuid}"
- payload = {"content_link": content_link}
- headers = {
- "Content-Type": "application/json",
- "Cookie": "_xsrf=2|c9d0a1bf|891f10d6ea5abc19d58be0d2fac84e6a|1774447752"
- }
- resp = requests.post(url, json=payload, headers=headers, timeout=15)
- resp.raise_for_status()
- return resp.json()
- def generate_advanced(self, mode: str, prompt: str, image: Optional[str] = None,
- mask_image: Optional[str] = None, pose_image: Optional[str] = None,
- control_nets: Optional[List[Dict[str, Any]]] = None,
- negative_prompt: str = "lowres, bad anatomy, error",
- width: int = 512, height: int = 512, steps: int = 20,
- cfg_scale: float = 7.0, img_count: int = 1,
- base_model_uuid: Optional[str] = None) -> Dict[str, Any]:
-
- # Base shared params
- generate_params = {
- "checkPointId": base_model_uuid if base_model_uuid else CHECKPOINT_ID,
- "prompt": prompt,
- "negativePrompt": negative_prompt,
- "sampler": 15,
- "steps": steps,
- "cfgScale": float(cfg_scale),
- "width": width,
- "height": height,
- "imgCount": img_count
- }
- payload = {
- "templateUuid": TEMPLATE_UUID,
- "generateParams": generate_params
- }
- # text2img does not need modifications
- if mode == "text2img":
- pass
-
- elif mode == "img2img":
- if not image:
- raise ValueError("Image is required for img2img mode")
- img_url = self.process_image_url(image)
- generate_params["sourceImage"] = img_url
- generate_params["denoisingStrength"] = 0.5
-
- elif mode in ["canny", "softedge", "lineart", "openpose", "depth", "controlnet"]:
- cnets_to_process = []
- if control_nets and len(control_nets) > 0:
- cnets_to_process = control_nets
- else:
- if not image:
- raise ValueError(f"Image is required for {mode} mode")
- cnets_to_process = [{"mode": mode, "image": image}]
-
- final_cnet_configs = []
-
- for idx, cnet in enumerate(cnets_to_process):
- cnet_mode = cnet.get("mode")
- cnet_img = cnet.get("image")
- cnet_weight = float(cnet.get("weight", 1.0))
-
- if not cnet_img:
- continue
-
- img_url = self.process_image_url(cnet_img)
-
- cnet_config = {
- "unitOrder": idx + 1,
- "sourceImage": img_url,
- "width": width,
- "height": height,
- "controlWeight": cnet_weight,
- "startingControlStep": 0.0,
- "endingControlStep": 1.0,
- "pixelPerfect": 1,
- "controlMode": 0
- }
- if cnet_mode == "canny":
- cnet_config.update({
- "preprocessor": 1,
- "model": self.sdxl_canny,
- "annotationParameters": {
- "canny": {"preprocessorResolution": 512, "lowThreshold": 100, "highThreshold": 200}
- }
- })
- elif cnet_mode == "softedge":
- cnet_config.update({
- "preprocessor": 5,
- "model": self.sdxl_softedge,
- "annotationParameters": {
- "hed": {"preprocessorResolution": 512}
- }
- })
- elif cnet_mode == "lineart":
- cnet_config.update({
- "preprocessor": 32,
- "model": self.sdxl_lineart,
- "annotationParameters": {
- "lineart": {"preprocessorResolution": 512}
- }
- })
- elif cnet_mode == "openpose":
- cnet_config.update({
- "preprocessor": 14,
- "model": self.sdxl_openpose,
- "annotationParameters": {
- "openposeFull": {"preprocessorResolution": 512}
- }
- })
- elif cnet_mode == "depth":
- cnet_config.update({
- # Assuming Midas depth (9) or Zoe depth (39). Usually 9 is safe
- "preprocessor": 9,
- "model": self.sdxl_depth,
- "annotationParameters": {
- "depthMidas": {"preprocessorResolution": 512}
- }
- })
-
- final_cnet_configs.append(cnet_config)
-
- if not final_cnet_configs:
- raise ValueError("No valid control_nets processed.")
-
- generate_params["controlNet"] = final_cnet_configs
-
- elif mode == "inpaint":
- if not image or not mask_image:
- raise ValueError("Both image and mask_image are required for inpaint mode")
- img_url = self.process_image_url(image)
- mask_url = self.process_image_url(mask_image)
-
- generate_params["mode"] = 4
- generate_params["sourceImage"] = img_url
- generate_params["denoisingStrength"] = 0.5
- generate_params["inpaintParam"] = {
- "maskImage": mask_url,
- "maskBlur": 4,
- "inpaintArea": 0
- }
-
- elif mode == "instantid":
- if not image or not pose_image:
- raise ValueError("Both face image (image) and pose_image are required for instantid")
-
- payload["templateUuid"] = INSTANT_ID_TEMPLATE_UUID
- generate_params["sampler"] = 20
-
- face_img_url = self.process_image_url(image)
- pose_img_url = self.process_image_url(pose_image)
-
- generate_params["controlNet"] = [
- {
- "unitOrder": 1,
- "sourceImage": face_img_url,
- "width": 1080,
- "height": 1432
- },
- {
- "unitOrder": 2,
- "sourceImage": pose_img_url,
- "width": 1024,
- "height": 1024
- }
- ]
- else:
- raise ValueError(f"Unknown mode: {mode}")
- task_id = self.submit_task_payload(payload)
- return self.wait_for_result(task_id)
- # Legacy method for backwards compatibility
- def generate_image(self, *args, **kwargs):
- raise NotImplementedError("generate_image is deprecated. Use generate_advanced with mode='canny'")
|