"""测试 OSS 上传 + CDN 可访问性""" import asyncio import json import os import tempfile import httpx async def test(): from cyber_sdk.ali_oss import upload_localfile # 创建测试图片 (1x1 PNG) tmp = os.path.join(tempfile.gettempdir(), "toolhub_test.png") with open(tmp, "wb") as f: f.write( b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01" b"\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89" b"\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01" b"\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82" ) safe_path = os.path.abspath(tmp).replace("\\", "/") print(f"Uploading {safe_path} ...") result = await upload_localfile( file_path=safe_path, bucket_path="toolhub_images", bucket_name="aigc-admin", ) print(f"Upload result: {json.dumps(result, ensure_ascii=False)}") oss_key = result.get("oss_object_key") if oss_key: cdn_url = f"https://res.cybertogether.net/{oss_key}" print(f"CDN URL: {cdn_url}") async with httpx.AsyncClient(timeout=10) as client: resp = await client.head(cdn_url) print(f"CDN HEAD status: {resp.status_code}") print(f"Content-Type: {resp.headers.get('content-type')}") print(f"Content-Length: {resp.headers.get('content-length')}") # 测试 Qwen 是否能访问(模拟:从不同网络 GET) async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(cdn_url) print(f"CDN GET status: {resp.status_code}, size: {len(resp.content)} bytes") print("[PASS]" if resp.status_code == 200 else "[FAIL]") else: print("Upload failed: no oss_object_key") print("[FAIL]") os.unlink(tmp) if __name__ == "__main__": asyncio.run(test())