aliyun_oss.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- coding: utf-8 -*-
  2. # @Time: 2023/12/26
  3. import time
  4. from datetime import datetime
  5. from typing import Dict, Any, Optional
  6. import oss2
  7. import requests
  8. # OSS_BUCKET_PATH = "douyin"
  9. OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm"
  10. OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  11. OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址
  12. # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
  13. OSS_BUCKET_NAME = "art-crawler"
  14. class Oss():
  15. @classmethod
  16. def channel_upload_oss(cls, src_url: str,
  17. video_id: str,
  18. referer: Optional[str] = None) -> Dict[str, Any]:
  19. headers = {
  20. 'Accept': '*/*',
  21. 'Accept-Language': 'zh-CN,zh;q=0.9',
  22. 'Cache-Control': 'no-cache',
  23. 'Pragma': 'no-cache',
  24. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
  25. 'Chrome/117.0.0.0 Safari/537.36',
  26. }
  27. if referer:
  28. headers.update({'Referer': referer})
  29. response = requests.request(url=src_url, method='GET', headers=headers, timeout=60)
  30. file_content = response.content
  31. content_type = response.headers.get('Content-Type', 'application/octet-stream')
  32. oss_object_key = f'channel/video/{video_id}'
  33. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  34. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME)
  35. response = bucket.put_object(oss_object_key, file_content, headers={'Content-Type': content_type})
  36. if 'Content-Length' in response.headers:
  37. return {
  38. 'status': response.status,
  39. 'oss_object_key': oss_object_key}
  40. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  41. """
  42. 视频发送到art-pubbucket
  43. """
  44. @classmethod
  45. def stitching_sync_upload_oss(cls, src_url: str,
  46. video_id: str) -> Dict[str, Any]:
  47. oss_object_key = f'jq_oss/video/{video_id}'
  48. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  49. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
  50. response = bucket.put_object_from_file(oss_object_key, src_url)
  51. if 'Content-Length' in response.headers:
  52. return {
  53. 'status': response.status,
  54. 'oss_object_key': oss_object_key,
  55. 'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
  56. }
  57. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  58. """
  59. 封面发送到art-pubbucket
  60. """
  61. @classmethod
  62. def stitching_fm_upload_oss(cls, src_url: str,
  63. video_id: str) -> Dict[str, Any]:
  64. oss_object_key = f'jq_oss/jpg/{video_id}.jpg'
  65. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  66. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
  67. response = bucket.put_object_from_file(oss_object_key, src_url)
  68. if 'Content-Length' in response.headers:
  69. return {
  70. 'status': response.status,
  71. 'oss_object_key': oss_object_key,
  72. 'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
  73. }
  74. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  75. """
  76. 封面发送到art-pubbucket
  77. """
  78. @classmethod
  79. def mp3_upload_oss(cls, src_url: str,
  80. video_id: str) -> Dict[str, Any]:
  81. oss_object_key = f'jq_audio/audio/{video_id}.mp3'
  82. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  83. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-crawler")
  84. response = bucket.put_object_from_file(oss_object_key, src_url)
  85. if 'Content-Length' in response.headers:
  86. return {
  87. 'status': response.status,
  88. 'oss_object_key': oss_object_key,
  89. 'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
  90. }
  91. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  92. @classmethod
  93. def download_video_oss(cls, video_url, video_path_url, v_id):
  94. video_path = video_path_url + str(v_id) + '.mp4'
  95. oss_object_key = cls.channel_upload_oss(video_url, v_id)
  96. time.sleep(2)
  97. oss_object = oss_object_key.get("oss_object_key")
  98. if oss_object:
  99. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  100. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME)
  101. # 获取指定路径下的对象列表
  102. bucket.get_object_to_file(oss_object, video_path)
  103. time.sleep(5)
  104. return video_path
  105. else:
  106. return video_path
  107. @classmethod
  108. def download_sph_ls(cls, video_url, video_path_url, v_id):
  109. if "jpg" in video_url:
  110. video_path = video_path_url + str(v_id) + '.jpg'
  111. else:
  112. video_path = video_path_url + str(v_id) + '.mp4'
  113. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  114. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME)
  115. # 获取指定路径下的对象列表
  116. bucket.get_object_to_file(video_url, video_path)
  117. time.sleep(5)
  118. return video_path
  119. if __name__ == '__main__':
  120. Oss.download_sph_ls('channel/video/sph/14374775553517295881.jpg','asa','1')