download_videos_task.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import traceback
  2. import datetime
  3. import os
  4. import oss2
  5. import multiprocessing
  6. from threading import Timer
  7. from utils import data_check, get_feature_data
  8. from config import set_config
  9. from log import Log
  10. import ODPSQueryUtil
  11. config_ = set_config()
  12. log_ = Log()
  13. features = ['videoid', 'title', 'video_path']
  14. def download_video_from_oss(video_id, video_path, download_folder):
  15. """从oss下载视频"""
  16. try:
  17. pid = int(os.getpid() % 2)
  18. print(f"{video_id} download start ...")
  19. download_folder = f"{download_folder}_{pid}"
  20. if not os.path.exists(download_folder):
  21. os.makedirs(download_folder)
  22. video_local_dir = os.path.join(download_folder, video_id)
  23. os.makedirs(video_local_dir)
  24. video_filename = video_path.split('/')[-1]
  25. video_local_path = os.path.join(video_local_dir, video_filename)
  26. # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
  27. # auth = oss2.Auth(access_key_id=config_.ODPS_CONFIG['ACCESSID'], access_key_secret=config_.ODPS_CONFIG['ACCESSKEY'])
  28. auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'],
  29. access_key_secret=config_.OSS_CONFIG['accessKeySecret'])
  30. # Endpoint以杭州为例,其它Region请按实际情况填写。
  31. bucket = oss2.Bucket(
  32. auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
  33. # 下载OSS文件到本地文件。
  34. # <yourObjectName>由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。
  35. # <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
  36. bucket.get_object_to_file(video_path, video_local_path)
  37. # m3u8文件,需下载所有ts文件
  38. if video_filename.split('.')[-1] == 'm3u8':
  39. root_path = '/'.join(video_path.split('/')[:-1])
  40. with open(video_local_path, 'r') as rf:
  41. lines = rf.readlines()
  42. for line in lines:
  43. line = line.strip()
  44. print(line)
  45. if line[-3:] == '.ts':
  46. ts_path = os.path.join(root_path, line)
  47. ts_local_path = os.path.join(video_local_dir, line)
  48. bucket.get_object_to_file(ts_path, ts_local_path)
  49. print(f"{video_id} download end!")
  50. except:
  51. print(f"{video_id} download fail!")
  52. print(traceback.format_exc())
  53. def download_videos(project, table, dt):
  54. # 获取特征数据
  55. feature_df = get_feature_data(
  56. project=project, table=table, dt=dt, features=features)
  57. download_folder = 'videos'
  58. video_id_list = feature_df['videoid'].to_list()
  59. pool = multiprocessing.Pool(processes=6)
  60. for video_id in video_id_list:
  61. video_path = feature_df[feature_df['videoid']
  62. == video_id]['video_path'].values[0].strip()
  63. video_path = video_path.replace(' ', '')
  64. print(video_id, video_path)
  65. pool.apply_async(
  66. func=download_video_from_oss,
  67. args=(video_id, video_path, download_folder)
  68. )
  69. pool.close()
  70. pool.join()
  71. def timer_check():
  72. try:
  73. project = config_.DAILY_VIDEO['project']
  74. table = config_.DAILY_VIDEO['table']
  75. now_date = datetime.datetime.today()
  76. print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  77. dt = datetime.datetime.strftime(
  78. now_date-datetime.timedelta(days=1), '%Y%m%d')
  79. # 查看数据是否已准备好
  80. data_count = data_check(project=project, table=table, dt=dt)
  81. if data_count > 0:
  82. print(f'videos count = {data_count}')
  83. # 数据准备好,进行视频下载
  84. download_videos(project=project, table=table, dt=dt)
  85. print(f"videos download end!")
  86. else:
  87. # 数据没准备好,1分钟后重新检查
  88. Timer(60, timer_check).start()
  89. except Exception as e:
  90. print(f"视频下载失败, exception: {e}, traceback: {traceback.format_exc()}")
  91. if __name__ == '__main__':
  92. # timer_check()
  93. size = 2000
  94. for i in range(0, 1000, size):
  95. print(f"query_videos start i = {i} ...")
  96. records = ODPSQueryUtil.query_videos(i, size)
  97. if records is None or len(records) == 0:
  98. continue
  99. print(f"Got {len(records)} records")
  100. pool = multiprocessing.Pool(processes=6)
  101. for record in records:
  102. try:
  103. videoid = record.videoid
  104. video_path = record.video_path
  105. pool.apply_async(
  106. func=download_video_from_oss,
  107. args=(videoid, video_path, 'videos')
  108. )
  109. except Exception as e:
  110. print(
  111. f"download video fail, exception: {e}, traceback: {traceback.format_exc()}")
  112. print(f"query_videos end i = {i} ...")
  113. pool.close()
  114. pool.join()