123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import os
- import time
- import uuid
- import requests
- class DownLoad:
- @classmethod
- def download_video(cls, video_url, file_path):
- video = file_path + 'video.mp4'
- try:
- for i in range(3):
- payload = {}
- headers = {}
- response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(f"{video}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- return video
- return video
- except Exception:
- return video
- @classmethod
- def download_m3u8_video(cls ,url, file_path):
- r = requests.get(url)
- if r.status_code != 200:
- return False
- m3u8_list = r.text.split('\n')
- m3u8_list = [i for i in m3u8_list if i and i[0] != '#']
- ts_list = []
- for ts_url in m3u8_list:
- ts_url = url.rsplit('/', 1)[0] + '/' + ts_url
- ts_list.append(ts_url)
- with open(file_path, 'wb') as f:
- for ts_url in ts_list:
- r = requests.get(ts_url)
- if r.status_code == 200:
- f.write(r.content)
- return True
- @classmethod
- def convert_ts_to_mp4(cls, ts_file_path, mp4_file_path):
- os.system(f'ffmpeg -i {ts_file_path} -c copy {mp4_file_path}')
- @classmethod
- def download_pq_video(cls,video_path_url , video_url_list):
- video_list = []
- for video_url in video_url_list:
- video = f'{video_path_url}{str(uuid.uuid4())}.mp4'
- try:
- payload = {}
- headers = {}
- response = requests.request("GET", video_url, headers=headers, data=payload, timeout=60)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(f"{video}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- video_list.append(video)
- time.sleep(1)
- except Exception:
- continue
- return video_list
|