"""即梦(或兼容的异步任务)上游 HTTP 客户端 — 与 liblibai_client 同类,无本地状态。""" from __future__ import annotations import os from typing import Any, Literal import requests from dotenv import load_dotenv load_dotenv() class JiMengClient: """通过环境变量配置上游 base URL 与路径,将请求原样转发并返回 JSON。""" def __init__(self) -> None: self.base = (os.getenv("JI_MENG_API_BASE") or "https://crawler.aiddit.com/crawler/ji_meng").rstrip("/") if not self.base: raise ValueError("缺少环境变量 JI_MENG_API_BASE(上游服务根 URL,如 https://api.example.com)") self.add_path = os.getenv("JI_MENG_ADD_TASK_PATH", "/add_task") self.query_path = os.getenv("JI_MENG_QUERY_TASK_PATH", "/query_task") self.api_key = os.getenv("JI_MENG_API_KEY", "").strip() def _headers(self) -> dict[str, str]: h = {"Content-Type": "application/json"} if self.api_key: h["Authorization"] = f"Bearer {self.api_key}" return h def submit_task(self, task_type: Literal['image', 'video'], prompt: str, image_url: str | None = None) -> dict[str, Any]: url = f"{self.base}{self.add_path if self.add_path.startswith('/') else '/' + self.add_path}" body: dict[str, Any] = {"task_type": task_type, "prompt": prompt} if image_url: body["image_url"] = image_url resp = requests.post(url, json=body, headers=self._headers(), timeout=120) resp.raise_for_status() return resp.json() def query_task(self, task_id: str) -> dict[str, Any]: url = f"{self.base}{self.query_path if self.query_path.startswith('/') else '/' + self.query_path}" resp = requests.post( url, json={"task_id": task_id}, headers=self._headers(), timeout=60, ) resp.raise_for_status() return resp.json()