download_videos_task.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import traceback
  2. import datetime
  3. import os
  4. import oss2
  5. import multiprocessing
  6. from threading import Timer
  7. from utils import data_check, get_feature_data
  8. from config import set_config
  9. from log import Log
  10. import ODPSQueryUtil
  11. from ReadXlsxFile import getVideoInfoInXlxs
  12. import requests
  13. config_ = set_config()
  14. log_ = Log()
  15. features = ['videoid', 'title', 'video_path']
  16. cdn_domain = 'http://rescdn.yishihui.com/'
  17. def download_file(url, local_path):
  18. response = requests.get(url)
  19. if response.status_code == 200:
  20. with open(local_path, 'wb') as f:
  21. f.write(response.content)
  22. else:
  23. print(f"Failed to download {url}")
  24. def download_video_from_oss(video_id, video_path, download_folder):
  25. """从oss下载视频"""
  26. try:
  27. pid = int(os.getpid() % 2)
  28. print(f"{video_id} download start ...")
  29. download_folder = f"{download_folder}_{pid}"
  30. if not os.path.exists(download_folder):
  31. os.makedirs(download_folder)
  32. video_local_dir = os.path.join(download_folder, str(video_id))
  33. os.makedirs(video_local_dir)
  34. video_filename = video_path.split('/')[-1]
  35. video_local_path = os.path.join(video_local_dir, video_filename)
  36. # 下载视频文件到本地
  37. video_url = cdn_domain + video_path
  38. download_file(video_url, video_local_path)
  39. # m3u8文件,需下载所有ts文件
  40. if video_filename.split('.')[-1] == 'm3u8':
  41. root_path = '/'.join(video_path.split('/')[:-1])
  42. with open(video_local_path, 'r') as rf:
  43. lines = rf.readlines()
  44. for line in lines:
  45. line = line.strip()
  46. print(line)
  47. if line[-3:] == '.ts':
  48. ts_url = cdn_domain + os.path.join(root_path, line)
  49. ts_local_path = os.path.join(video_local_dir, line)
  50. download_file(ts_url, ts_local_path)
  51. print(f"{video_id} download end!")
  52. except:
  53. print(f"{video_id} download fail!")
  54. print(traceback.format_exc())
  55. def download_videos(project, table, dt):
  56. # 获取特征数据
  57. feature_df = get_feature_data(
  58. project=project, table=table, dt=dt, features=features)
  59. download_folder = 'videos'
  60. video_id_list = feature_df['videoid'].to_list()
  61. pool = multiprocessing.Pool(processes=6)
  62. for video_id in video_id_list:
  63. video_path = feature_df[feature_df['videoid']
  64. == video_id]['video_path'].values[0].strip()
  65. video_path = video_path.replace(' ', '')
  66. print(video_id, video_path)
  67. pool.apply_async(
  68. func=download_video_from_oss,
  69. args=(video_id, video_path, download_folder)
  70. )
  71. pool.close()
  72. pool.join()
  73. def timer_check():
  74. try:
  75. project = config_.DAILY_VIDEO['project']
  76. table = config_.DAILY_VIDEO['table']
  77. # 昨天
  78. now_date = datetime.datetime.today()
  79. print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  80. dt = datetime.datetime.strftime(
  81. now_date-datetime.timedelta(days=1), '%Y%m%d')
  82. # 查看数据是否已准备好
  83. data_count = data_check(project=project, table=table, dt=dt)
  84. if data_count > 0:
  85. print(f'videos count = {data_count}')
  86. # 数据准备好,进行视频下载
  87. download_videos(project=project, table=table, dt=dt)
  88. print(f"videos download end!")
  89. else:
  90. # 数据没准备好,1分钟后重新检查
  91. Timer(60, timer_check).start()
  92. except Exception as e:
  93. print(f"视频下载失败, exception: {e}, traceback: {traceback.format_exc()}")
  94. if __name__ == '__main__':
  95. # timer_check()
  96. feature_df = getVideoInfoInXlxs('past_videos.xlsx')
  97. download_folder = 'videos'
  98. video_id_list = feature_df['videoid'].to_list()
  99. pool = multiprocessing.Pool(processes=6)
  100. for video_id in video_id_list:
  101. video_path = feature_df[feature_df['videoid']
  102. == video_id]['video_path'].values[0].strip()
  103. video_path = video_path.replace(' ', '')
  104. print(video_id, video_path)
  105. pool.apply_async(
  106. func=download_video_from_oss,
  107. args=(video_id, video_path, download_folder)
  108. )
  109. pool.close()
  110. pool.join()