# -*- coding: utf-8 -*- # @Time: 2023/12/26 from datetime import datetime from typing import Dict, Any, Optional import oss2 import requests # OSS_BUCKET_PATH = "douyin" OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm" OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址 # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址 OSS_BUCKET_NAME = "art-crawler" class Oss(): # 抓取视频上传到art-crawler @classmethod def video_sync_upload_oss(cls, src_url: str, video_id: str, account_id: str, OSS_BUCKET_PATH: str, referer: Optional[str] = None) -> Dict[str, Any]: headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/117.0.0.0 Safari/537.36', } if referer: headers.update({'Referer': referer}) response = requests.request(url=src_url, method='GET', headers=headers) file_content = response.content content_type = response.headers.get('Content-Type', 'application/octet-stream') oss_object_key = f'{OSS_BUCKET_PATH}/{account_id}/{video_id}' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME) response = bucket.put_object(oss_object_key, file_content, headers={'Content-Type': content_type}) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key} raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') """ 视频发送到art-pubbucket """ @classmethod def stitching_sync_upload_oss(cls, src_url: str, video_id: str) -> Dict[str, Any]: oss_object_key = f'guanggao/video/{video_id}' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket") response = bucket.put_object_from_file(oss_object_key, src_url) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key, 'save_oss_timestamp': int(datetime.now().timestamp() * 1000), } raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') # 获取视频链接 将视频链接有效时间设置为1天 @classmethod def get_oss_url(cls, videos, video_path): auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME) list = [] for i in videos: try: # 获取指定路径下的对象列表 filename = i[2].split("/")[-1] bucket.get_object_to_file(i[2], f'{video_path}{filename}.mp4') list.append([i[0], i[1], i[2], f'{video_path}{filename}.mp4']) except Exception: continue return list