test.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. import oss2
  2. import requests
  3. import urllib.parse
  4. OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm"
  5. OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  6. # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com" # 内网地址
  7. OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
  8. def upload_to_oss(video_id):
  9. """
  10. Uploads video file to OSS
  11. :param video_id:
  12. :return:
  13. """
  14. # print("start download video...")
  15. # await download_video(video_url, "temp.mp4")
  16. # print("video download successfully done")
  17. oss_object_key = f'single_video/{video_id}'
  18. # print(key)
  19. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  20. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
  21. response = bucket.put_object_from_file(oss_object_key, "temp.mp4")
  22. print("Upload completed successfully.")
  23. if 'Content-Length' in response.headers:
  24. return {
  25. 'status': response.status,
  26. 'oss_object_key': oss_object_key}
  27. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  28. upload_to_oss(video_id="123456789")