from curl_cffi import requests from api.error import unauthorized_error from api.error import system_error import oss2 import random import string # 查询历史所有任务列表 def thomas_jobs(cookie, user_id, page_size=50): url = f"https://www.midjourney.com/api/pg/thomas-jobs?user_id={user_id}&page_size={page_size}" response = requests.get(url, headers=nec_headers(cookie), impersonate="chrome101") return response_catch(response) # 查询指定任务状态 def query_job_status(job_id, cookie): url = "https://www.midjourney.com/api/app/job-status" response = requests.post(url=url, headers=nec_headers(cookie), impersonate="chrome101", json={"jobIds": [job_id]}) return response_catch(response) # 查询任务队列 def query_job_queue(cookie, user_id): url = f"https://www.midjourney.com/api/app/users/queue?userId={user_id}" response = requests.get(url, headers=nec_headers(cookie), impersonate="chrome101") return response_catch(response) # 提交任务 mode: relaxed/fast def submit_job(cookie, prompt, user_id, mode): mode = mode or "fast" request_data = { "f": { "mode": mode, "private": False }, "channelId": f"singleplayer_{user_id}", "roomId": None, "metadata": { "imagePrompts": 1, "imageReferences": 0, "characterReferences": 0 }, "t": "imagine", "prompt": prompt } url = "https://www.midjourney.com/api/app/submit-jobs" response = requests.post(url=url, headers=nec_headers(cookie), impersonate="chrome101", json=request_data) return response_catch(response) def generate_random_string(length=15): characters = string.ascii_letters + string.digits return ''.join(random.choice(characters) for _ in range(length)) def upload_image_to_oss(job_id, image_num=4): OSS_ACCESS_KEY_ID = 'LTAI5tEYvefc4U3fyU5du225' OSS_ACCESS_KEY_SECRET = 'Z1gZtAGe8NwRtXgPzVgRMkRez4Ex4K' OSS_ENDPOINT = 'oss-ap-southeast-1-internal.aliyuncs.com' OSS_BUCKET_NAME = 'aigc-admin' results = [] for i in range(image_num): object_name = 'saas/mjImage/' + generate_random_string(32) + ".png" # 下载网络图片 response = requests.get(f"https://cdn.midjourney.com/{job_id}/0_{i}.png", headers=nec_headers(""), impersonate="chrome101") if response.ok: # 连接OSS auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) # 上传图片到OSS bucket.put_object(object_name, response.content) # 构造OSS文件URL oss_url = f"https://res.aiddit.com/{object_name}" results.append(oss_url) return results def response_catch(response): if response.ok: return response.json() elif response.status_code == 401: raise unauthorized_error("unauthorized access (401)") else: raise system_error(f"system_error ({response.status_code})") def nec_headers(cookie): headers = { 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'cookie': cookie, 'pragma': 'no-cache', 'priority': 'u=1, i', 'referer': 'https://www.midjourney.com/explore', 'sec-ch-ua': '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36', 'x-requested-with': 'XMLHttpRequest', 'x-csrf-protection': 1 } return headers