# -*- coding: utf-8 -*- # @Time: 2023/12/26 import time import uuid from datetime import datetime from typing import Dict, Any, Optional import oss2 from oss2 import Auth, Bucket, exceptions from loguru import logger from utils.config import OssConfig import requests OSS_ACCESS_KEY_ID = "LTAIP6x1l3DXfSxm" OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址 OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址 OSS_BUCKET_NAME = "art-crawler" class Oss(): @classmethod def channel_upload_oss(cls, src_url: str, video_id: str, referer: Optional[str] = None) -> Dict[str, Any]: headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/117.0.0.0 Safari/537.36', } if referer: headers.update({'Referer': referer}) response = requests.request(url=src_url, method='GET', headers=headers, timeout=30) file_content = response.content content_type = response.headers.get('Content-Type', 'application/octet-stream') oss_object_key = f'carry/video/{video_id}' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME) response = bucket.put_object(oss_object_key, file_content, headers={'Content-Type': content_type}) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key} raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') """ 视频发送到art-pubbucket """ @classmethod def stitching_sync_upload_oss(cls, src_url: str, video_id: str) -> Dict[str, Any]: oss_object_key = f'carry/video/{video_id}' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket") response = bucket.put_object_from_file(oss_object_key, src_url) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key, 'save_oss_timestamp': int(datetime.now().timestamp() * 1000), } raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') """ 封面发送到art-pubbucket """ @classmethod def stitching_fm_upload_oss(cls, src_url: str, video_id: str) -> Dict[str, Any]: oss_object_key = f'jq_oss/jpg/{video_id}.jpg' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket") response = bucket.put_object_from_file(oss_object_key, src_url) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key, 'save_oss_timestamp': int(datetime.now().timestamp() * 1000), } raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') """ 封面发送到art-pubbucket """ @classmethod def mp3_upload_oss(cls, src_url: str, video_id: str) -> Dict[str, Any]: oss_object_key = f'jq_audio/audio/{video_id}.mp3' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-crawler") response = bucket.put_object_from_file(oss_object_key, src_url) if 'Content-Length' in response.headers: return { 'status': response.status, 'oss_object_key': oss_object_key, 'save_oss_timestamp': int(datetime.now().timestamp() * 1000), } raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}') @classmethod def download_video_oss(cls, url, file_path): video_path = file_path + 'video.mp4' oss_object_key = cls.channel_upload_oss(url, str(uuid.uuid4())) time.sleep(2) oss_object = oss_object_key.get("oss_object_key") if oss_object: auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME) # 获取指定路径下的对象列表 bucket.get_object_to_file(oss_object, video_path) time.sleep(5) return video_path else: return video_path @classmethod def download_sph_ls(cls, video_url, video_path_url, v_id): if "jpg" in video_url: video_path = video_path_url + str(v_id) + '.jpg' else: video_path = video_path_url + str(v_id) + '.mp4' auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME) # 获取指定路径下的对象列表 bucket.get_object_to_file(video_url, video_path) time.sleep(5) return video_path @classmethod def upload_to_aliyun(cls,local_path): """ 上传文件到阿里云 OSS 参数: local_path (str): 本地文件路径 oss_path (str): OSS 目标路径 返回: bool: 上传成功返回 True,失败返回 False """ try: oss_path = generate_oss_path() logger.info(f"开始上传到阿里云 OSS: {oss_path}") # 初始化认证和存储空间 auth = Auth(OssConfig["OSS_ACCESS_KEY_ID"], OssConfig["OSS_ACCESS_KEY_SECRET"]) bucket = Bucket(auth, OssConfig["ENDPOINT"], OssConfig["BUCKETNAME"]) # 上传文件 result = bucket.put_object_from_file(oss_path, local_path) # 检查上传结果 if result.status == 200: logger.info(f"上传成功,OSS 路径: {oss_path}") return OssConfig["BUCKETNAME_HOST"]+oss_path else: logger.error(f"上传失败,状态码: {result.status}") return False except exceptions.OssError as e: logger.error(f"阿里云 OSS 错误: {e}") return False except Exception as e: logger.error(f"上传异常: {e}") return def generate_oss_path(): """生成唯一OSS路径(日期+UUID)""" # 生成日期目录(格式:YYYYMMDD) date_dir = datetime.now().strftime("%Y%m%d") # 生成随机文件名(使用 UUID) random_filename = f"{uuid.uuid4().hex}.mp3" # 构建完整的 OSS 路径 oss_base_dir = OssConfig["OSS_BASE_DIR"] oss_path = f"{oss_base_dir}/{date_dir}/{random_filename}" return f"{oss_path}" if __name__ == '__main__': Oss.download_sph_ls('channel/video/sph/14374775553517295881.jpg','asa','1')