123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import traceback
- import datetime
- import os
- import oss2
- import multiprocessing
- from threading import Timer
- from utils import data_check, get_feature_data
- from config import set_config
- from log import Log
- import ODPSQueryUtil
- from ReadXlsxFile import getVideoInfoInXlxs
- import requests
- config_ = set_config()
- log_ = Log()
- features = ['videoid', 'title', 'video_path']
- cdn_domain = 'http://rescdn.yishihui.com/'
- def download_file(url, local_path):
- response = requests.get(url)
- if response.status_code == 200:
- with open(local_path, 'wb') as f:
- f.write(response.content)
- else:
- print(f"Failed to download {url}")
- def download_video_from_oss(video_id, video_path, download_folder):
- """从oss下载视频"""
- try:
- pid = int(os.getpid() % 2)
- print(f"{video_id} download start ...")
- download_folder = f"{download_folder}_{pid}"
- if not os.path.exists(download_folder):
- os.makedirs(download_folder)
- video_local_dir = os.path.join(download_folder, str(video_id))
- os.makedirs(video_local_dir)
- video_filename = video_path.split('/')[-1]
- video_local_path = os.path.join(video_local_dir, video_filename)
- # 下载视频文件到本地
- video_url = cdn_domain + video_path
- download_file(video_url, video_local_path)
- # m3u8文件,需下载所有ts文件
- if video_filename.split('.')[-1] == 'm3u8':
- root_path = '/'.join(video_path.split('/')[:-1])
- with open(video_local_path, 'r') as rf:
- lines = rf.readlines()
- for line in lines:
- line = line.strip()
- print(line)
- if line[-3:] == '.ts':
- ts_url = cdn_domain + os.path.join(root_path, line)
- ts_local_path = os.path.join(video_local_dir, line)
- download_file(ts_url, ts_local_path)
- print(f"{video_id} download end!")
- except:
- print(f"{video_id} download fail!")
- print(traceback.format_exc())
- def download_videos(project, table, dt):
- # 获取特征数据
- feature_df = get_feature_data(
- project=project, table=table, dt=dt, features=features)
- download_folder = 'videos'
- video_id_list = feature_df['videoid'].to_list()
- pool = multiprocessing.Pool(processes=6)
- for video_id in video_id_list:
- video_path = feature_df[feature_df['videoid']
- == video_id]['video_path'].values[0].strip()
- video_path = video_path.replace(' ', '')
- print(video_id, video_path)
- pool.apply_async(
- func=download_video_from_oss,
- args=(video_id, video_path, download_folder)
- )
- pool.close()
- pool.join()
- def timer_check():
- try:
- project = config_.DAILY_VIDEO['project']
- table = config_.DAILY_VIDEO['table']
- # 昨天
- now_date = datetime.datetime.today()
- print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
- dt = datetime.datetime.strftime(
- now_date-datetime.timedelta(days=1), '%Y%m%d')
- # 查看数据是否已准备好
- data_count = data_check(project=project, table=table, dt=dt)
- if data_count > 0:
- print(f'videos count = {data_count}')
- # 数据准备好,进行视频下载
- download_videos(project=project, table=table, dt=dt)
- print(f"videos download end!")
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, timer_check).start()
- except Exception as e:
- print(f"视频下载失败, exception: {e}, traceback: {traceback.format_exc()}")
- if __name__ == '__main__':
- # timer_check()
- feature_df = getVideoInfoInXlxs('past_videos.xlsx')
- download_folder = 'videos'
- video_id_list = feature_df['videoid'].to_list()
- pool = multiprocessing.Pool(processes=6)
- for video_id in video_id_list:
- video_path = feature_df[feature_df['videoid']
- == video_id]['video_path'].values[0].strip()
- video_path = video_path.replace(' ', '')
- print(video_id, video_path)
- pool.apply_async(
- func=download_video_from_oss,
- args=(video_id, video_path, download_folder)
- )
- pool.close()
- pool.join()
|