import traceback import datetime import os import oss2 import multiprocessing from threading import Timer from utils import data_check, get_feature_data from config import set_config from log import Log config_ = set_config() log_ = Log() features = ['videoid', 'title', 'video_path'] def download_video_from_oss(video_id, video_path, download_folder): """从oss下载视频""" try: pid = int(os.getpid() % 2) print(f"{video_id} download start ...") download_folder = f"{download_folder}_{pid}" if not os.path.exists(download_folder): os.makedirs(download_folder) video_local_dir = os.path.join(download_folder, video_id) os.makedirs(video_local_dir) video_filename = video_path.split('/')[-1] video_local_path = os.path.join(video_local_dir, video_filename) # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。 # auth = oss2.Auth(access_key_id=config_.ODPS_CONFIG['ACCESSID'], access_key_secret=config_.ODPS_CONFIG['ACCESSKEY']) auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'], access_key_secret=config_.OSS_CONFIG['accessKeySecret']) # Endpoint以杭州为例,其它Region请按实际情况填写。 bucket = oss2.Bucket(auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket') # 下载OSS文件到本地文件。 # 由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。 # 由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。 bucket.get_object_to_file(video_path, video_local_path) # m3u8文件,需下载所有ts文件 if video_filename.split('.')[-1] == 'm3u8': root_path = '/'.join(video_path.split('/')[:-1]) with open(video_local_path, 'r') as rf: lines = rf.readlines() for line in lines: line = line.strip() print(line) if line[-3:] == '.ts': ts_path = os.path.join(root_path, line) ts_local_path = os.path.join(video_local_dir, line) bucket.get_object_to_file(ts_path, ts_local_path) print(f"{video_id} download end!") except: print(f"{video_id} download fail!") print(traceback.format_exc()) def download_videos(project, table, dt): # 获取特征数据 feature_df = get_feature_data(project=project, table=table, dt=dt, features=features) download_folder = 'videos' video_id_list = feature_df['videoid'].to_list() pool = multiprocessing.Pool(processes=6) for video_id in video_id_list: video_path = feature_df[feature_df['videoid'] == video_id]['video_path'].values[0].strip() video_path = video_path.replace(' ', '') print(video_id, video_path) pool.apply_async( func=download_video_from_oss, args=(video_id, video_path, download_folder) ) pool.close() pool.join() def timer_check(): try: project = config_.DAILY_VIDEO['project'] table = config_.DAILY_VIDEO['table'] now_date = datetime.datetime.today() print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}") dt = datetime.datetime.strftime(now_date-datetime.timedelta(days=1), '%Y%m%d') # 查看数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: print(f'videos count = {data_count}') # 数据准备好,进行视频下载 download_videos(project=project, table=table, dt=dt) print(f"videos download end!") else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() except Exception as e: print(f"视频下载失败, exception: {e}, traceback: {traceback.format_exc()}") if __name__ == '__main__': timer_check()