1234567891011121314151617181920212223242526272829303132333435363738394041 |
- import html
- import subprocess
- import time
- import requests
- from loguru import logger
- class DownLoad:
- @classmethod
- def download_video(cls, video_url, video_path_url):
- video = video_path_url + 'video.mp4'
- url_type = html.unescape(video_url).split('?')[0]
- if ".m3u8" in url_type:
- logger.info(f"[+] {video_url}开始下载m3u8格式的视频")
- ffmpeg_cmd_oss = [
- "ffmpeg",
- "-y", # 覆盖输出文件
- "-i", video_url, # 输入文件
- "-c", "copy", # 复制视频流
- "-bsf:a", "aac_adtstoasc", # 转换 AAC 音频格式
- video # 输出文件
- ]
- subprocess.run(ffmpeg_cmd_oss)
- return video
- else:
- try:
- for i in range(3):
- payload = {}
- headers = {}
- response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(f"{video}", "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- return video
- return video
- except Exception:
- return video
|