aliyun_oss_uploading.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime
  3. from typing import Dict, Any, Optional
  4. import oss2
  5. import requests
  6. OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm"
  7. OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  8. OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址
  9. # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
  10. OSS_BUCKET_NAME = "art-crawler"
  11. class Oss():
  12. # 抓取视频上传到art-crawler
  13. @classmethod
  14. def video_sync_upload_oss(cls, src_url: str,
  15. video_id: str,
  16. account_id: str,
  17. OSS_BUCKET_PATH: str,
  18. referer: Optional[str] = None) -> Dict[str, Any]:
  19. headers = {
  20. 'Accept': '*/*',
  21. 'Accept-Language': 'zh-CN,zh;q=0.9',
  22. 'Cache-Control': 'no-cache',
  23. 'Pragma': 'no-cache',
  24. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
  25. 'Chrome/117.0.0.0 Safari/537.36',
  26. }
  27. if referer:
  28. headers.update({'Referer': referer})
  29. response = requests.request(url=src_url, method='GET', headers=headers)
  30. file_content = response.content
  31. content_type = response.headers.get('Content-Type', 'application/octet-stream')
  32. oss_object_key = f'{OSS_BUCKET_PATH}/{account_id}/{video_id}'
  33. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  34. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME)
  35. response = bucket.put_object(oss_object_key, file_content, headers={'Content-Type': content_type})
  36. if 'Content-Length' in response.headers:
  37. return {
  38. 'status': response.status,
  39. 'oss_object_key': oss_object_key}
  40. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  41. # 视频发送到art-pubbucket
  42. @classmethod
  43. def stitching_sync_upload_oss(cls, src_url: str,
  44. video_id: str) -> Dict[str, Any]:
  45. oss_object_key = f'agc_oss/agc_video/{video_id}'
  46. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  47. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
  48. response = bucket.put_object_from_file(oss_object_key, src_url)
  49. if 'Content-Length' in response.headers:
  50. return {
  51. 'status': response.status,
  52. 'oss_object_key': oss_object_key,
  53. 'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
  54. }
  55. raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
  56. # 获取视频链接 将视频链接有效时间设置为1天
  57. @classmethod
  58. def get_oss_url(cls, videos, video_path):
  59. auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
  60. bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME)
  61. list = []
  62. for i in videos:
  63. try:
  64. # 获取指定路径下的对象列表
  65. filename = i[2].split("/")[-1]
  66. bucket.get_object_to_file(i[2], f'{video_path}{filename}.mp4')
  67. list.append([i[0], i[1], i[2], f'{video_path}{filename}.mp4'])
  68. except Exception:
  69. continue
  70. return list
  71. @classmethod
  72. def download_url(cls, videos, video_path, video):
  73. for i in range(3):
  74. payload = {}
  75. headers = {}
  76. response = requests.request("GET", videos, headers=headers, data=payload)
  77. if response.status_code == 200:
  78. video_url = []
  79. # 以二进制写入模式打开文件
  80. video = video_path+video+'.mp4'
  81. with open(f"{video}", "wb") as file:
  82. # 将响应内容写入文件
  83. file.write(response.content)
  84. video_url.append(video)
  85. return video_url
  86. return ''