| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import requests
- import os
- import sys
- import uuid
- import hmac
- import time
- import hashlib
- import base64
- from collections import OrderedDict
- from dotenv import load_dotenv
- # 加载环境变量(AK/SK)
- load_dotenv()
- # ========== 核心配置(替换成你的信息) ==========
- LIBLIBAI_AK = os.getenv("LIBLIBAI_ACCESS_KEY") # 从.env读取或直接写死
- LIBLIBAI_SK = os.getenv("LIBLIBAI_SECRET_KEY")
- LOCAL_IMAGE_PATH = r"C:\Users\11304\gitlab\cybertogether\tool_agent\control_reference.png" # 你的本地图片路径
- LIBLIBAI_DOMAIN = "https://openapi.liblibai.cloud"
- # ========== 核心工具函数 ==========
- def generate_auth_url(uri, ak, sk):
- """生成带签名的请求URL(仅用于获取OSS签名)"""
- ts = str(int(time.time() * 1000))
- nonce = uuid.uuid4().hex
- sign_str = f"{uri}&{ts}&{nonce}"
- # 生成签名
- dig = hmac.new(sk.encode(), sign_str.encode(), hashlib.sha1).digest()
- sig = base64.urlsafe_b64encode(dig).rstrip(b"=").decode()
- # 拼接URL
- return f"{LIBLIBAI_DOMAIN}{uri}?AccessKey={ak}&Timestamp={ts}&SignatureNonce={nonce}&Signature={sig}"
- def get_oss_upload_signature(file_name, ak, sk):
- """获取OSS上传签名"""
- # 1. 提取文件扩展名
- ext = os.path.splitext(file_name)[1].lstrip('.').lower()
- if ext not in ['jpg', 'jpeg', 'png']:
- print(f"❌ 不支持的文件格式: {ext},仅支持jpg/png/jpeg")
- return None
-
- # 2. 调用签名接口
- uri = "/api/generate/upload/signature"
- auth_url = generate_auth_url(uri, ak, sk)
- headers = {'Content-Type': 'application/json'}
- payload = {
- "name": file_name, # 完整文件名(含扩展名)
- "extension": ext
- }
- try:
- print(f"📡 正在请求OSS签名... URL: {auth_url[:100]}...")
- resp = requests.post(auth_url, headers=headers, json=payload, timeout=10)
- resp.raise_for_status()
- result = resp.json()
-
- if result.get("code") != 0:
- print(f"❌ 获取签名失败: {result.get('msg')}")
- return None
-
- sig_data = result.get("data")
- print(f"✅ 获取签名成功!")
- print(f" - postUrl: {sig_data['postUrl']}")
- print(f" - key: {sig_data['key']}")
- return sig_data
- except Exception as e:
- print(f"❌ 请求签名接口异常: {str(e)}")
- return None
- def upload_to_oss(sig_data, local_file_path):
- """上传本地文件到OSS"""
- # 1. 校验文件
- if not os.path.exists(local_file_path):
- print(f"❌ 本地文件不存在: {local_file_path}")
- return None
-
- file_name = os.path.basename(local_file_path)
- ext = os.path.splitext(local_file_path)[1].lower()
- content_type = 'image/jpeg' if ext in ['.jpg', '.jpeg'] else 'image/png'
- # 2. 构造表单参数(必须用OrderedDict保证顺序,file放最后)
- form_data = OrderedDict([
- ('key', sig_data['key']),
- ('policy', sig_data['policy']),
- ('x-oss-date', sig_data['xOssDate']),
- ('x-oss-expires', str(sig_data['xOssExpires'])),
- ('x-oss-signature', sig_data['xOssSignature']),
- ('x-oss-credential', sig_data['xOssCredential']),
- ('x-oss-signature-version', sig_data['xOssSignatureVersion'])
- ])
- # 3. 读取文件并上传
- try:
- print(f"📤 正在上传文件到OSS...")
- print(f" - 本地文件: {local_file_path}")
- print(f" - OSS地址: {sig_data['postUrl']}")
- with open(local_file_path, 'rb') as f:
- files = {
- 'file': (file_name, f, content_type) # file必须是最后一个字段
- }
- # 发送上传请求
- resp = requests.post(
- sig_data['postUrl'],
- data=form_data,
- files=files,
- timeout=30,
- headers={'User-Agent': 'Mozilla/5.0'}
- )
-
- # 4. 输出上传结果
- print(f"\n📋 上传响应信息:")
- print(f" - 状态码: {resp.status_code}")
- print(f" - 响应头: {dict(resp.headers)}")
- print(f" - 响应体: {resp.text[:500] if resp.text else '空'}")
- if resp.status_code in [200, 204]:
- # 拼接最终的OSS URL(仅用于LiblibAI API调用,浏览器访问会403)
- oss_url = f"{sig_data['postUrl']}/{sig_data['key']}"
- print(f"\n✅ 上传成功!OSS URL: {oss_url}")
- print(f"⚠️ 注意:该URL仅能用于LiblibAI API调用,浏览器直接访问会返回403(私有读权限)")
- return oss_url
- else:
- print(f"\n❌ 上传失败!状态码: {resp.status_code}")
- return None
- except Exception as e:
- print(f"\n❌ 上传过程异常: {str(e)}")
- return None
- # ========== 主执行逻辑 ==========
- if __name__ == "__main__":
- # 1. 基础校验
- if not LIBLIBAI_AK or not LIBLIBAI_SK:
- print("❌ 请先配置LIBLIBAI_ACCESS_KEY和LIBLIBAI_SECRET_KEY(.env文件或直接写死)")
- sys.exit(1)
-
- if not os.path.exists(LOCAL_IMAGE_PATH):
- print(f"❌ 本地图片不存在: {LOCAL_IMAGE_PATH}")
- sys.exit(1)
-
- # 2. 获取OSS签名
- file_name = os.path.basename(LOCAL_IMAGE_PATH)
- sig_data = get_oss_upload_signature(file_name, LIBLIBAI_AK, LIBLIBAI_SK)
- if not sig_data:
- sys.exit(1)
-
- # 3. 上传图片到OSS
- upload_to_oss(sig_data, LOCAL_IMAGE_PATH)
|