aliyun_oss_uploading.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- coding: utf-8 -*-
  2. # @Time: 2023/12/26
  3. from datetime import datetime
  4. from typing import Dict, Any, Optional
  5. import oss2
  6. import requests
  7. OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm"
  8. OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  9. OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址
  10. # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
  11. class Oss():
  12. # 视频发送到art-pubbucket
  13. @classmethod
  14. def video_sync_upload_oss(cls, src_url: str,
  15. video_id: str) -> Dict[str, Any]:
  16. oss_object_key = f'single_video/{video_id}'
  17. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  18. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
  19. response = bucket.put_object_from_file(oss_object_key, src_url)
  20. if 'Content-Length' in response.headers:
  21. return {
  22. 'status': response.status,
  23. 'oss_object_key': oss_object_key,
  24. 'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
  25. }
  26. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  27. @classmethod
  28. def video_url_upload_oss(cls, src_url: str,
  29. video_id: str,
  30. referer: Optional[str] = None) -> Dict[str, Any]:
  31. headers = {
  32. 'Accept': '*/*',
  33. 'Accept-Language': 'zh-CN,zh;q=0.9',
  34. 'Cache-Control': 'no-cache',
  35. 'Pragma': 'no-cache',
  36. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
  37. 'Chrome/117.0.0.0 Safari/537.36',
  38. }
  39. if referer:
  40. headers.update({'Referer': referer})
  41. response = requests.request(url=src_url, method='GET', headers=headers)
  42. file_content = response.content
  43. content_type = response.headers.get('Content-Type', 'application/octet-stream')
  44. oss_object_key = f'single_video/{video_id}'
  45. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  46. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT,"art-pubbucket")
  47. response = bucket.put_object(oss_object_key, file_content, headers={'Content-Type': content_type})
  48. if 'Content-Length' in response.headers:
  49. return {
  50. 'status': response.status,
  51. 'oss_object_key': oss_object_key}
  52. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')