import html
import os
from urllib.parse import urlparse
import requests
class DownLoad:
@classmethod
def download_video(cls, video_url, video_path_url, tag_transport_channel):
video = video_path_url + 'video.mp4'
if tag_transport_channel == "抖音":
headers = {
'accept': '*/*',
'accept-encoding': 'identity;q=1, *;q=0',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'connection': 'keep-alive',
'host': urlparse(video_url).netloc,
'pragma': 'no-cache',
'range': 'bytes=0-',
'referer': video_url,
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
}
payload = {}
for i in range(3):
try:
response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
if response.status_code == 200:
# 以二进制写入模式打开文件
with open(f"{video}", "wb") as file:
# 将响应内容写入文件
file.write(response.content)
return video
except Exception:
return video
else:
try:
for i in range(3):
payload = {}
headers = {}
response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
if response.status_code == 200:
# 以二进制写入模式打开文件
with open(f"{video}", "wb") as file:
# 将响应内容写入文件
file.write(response.content)
return video
return video
except Exception:
return video
@classmethod
def download_m3u8_video(cls ,url, file_path):
r = requests.get(url)
if r.status_code != 200:
return False
m3u8_list = r.text.split('\n')
m3u8_list = [i for i in m3u8_list if i and i[0] != '#']
ts_list = []
for ts_url in m3u8_list:
ts_url = url.rsplit('/', 1)[0] + '/' + ts_url
ts_list.append(ts_url)
with open(file_path, 'wb') as f:
for ts_url in ts_list:
r = requests.get(ts_url)
if r.status_code == 200:
f.write(r.content)
return True
@classmethod
def convert_ts_to_mp4(cls, ts_file_path, mp4_file_path):
os.system(f'ffmpeg -i {ts_file_path} -c copy {mp4_file_path}')