download_video.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import html
  2. import os
  3. from urllib.parse import urlparse
  4. import requests
  5. class DownLoad:
  6. @classmethod
  7. def download_video(cls, video_url, video_path_url, tag_transport_channel):
  8. video = video_path_url + 'video.mp4'
  9. if tag_transport_channel == "抖音":
  10. headers = {
  11. 'accept': '*/*',
  12. 'accept-encoding': 'identity;q=1, *;q=0',
  13. 'accept-language': 'zh-CN,zh;q=0.9',
  14. 'cache-control': 'no-cache',
  15. 'connection': 'keep-alive',
  16. 'host': urlparse(video_url).netloc,
  17. 'pragma': 'no-cache',
  18. 'range': 'bytes=0-',
  19. 'referer': video_url,
  20. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
  21. }
  22. payload = {}
  23. for i in range(3):
  24. try:
  25. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
  26. if response.status_code == 200:
  27. # 以二进制写入模式打开文件
  28. with open(f"{video}", "wb") as file:
  29. # 将响应内容写入文件
  30. file.write(response.content)
  31. return video
  32. except Exception:
  33. return video
  34. else:
  35. try:
  36. for i in range(3):
  37. payload = {}
  38. headers = {}
  39. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
  40. if response.status_code == 200:
  41. # 以二进制写入模式打开文件
  42. with open(f"{video}", "wb") as file:
  43. # 将响应内容写入文件
  44. file.write(response.content)
  45. return video
  46. return video
  47. except Exception:
  48. return video
  49. @classmethod
  50. def download_m3u8_video(cls ,url, file_path):
  51. r = requests.get(url)
  52. if r.status_code != 200:
  53. return False
  54. m3u8_list = r.text.split('\n')
  55. m3u8_list = [i for i in m3u8_list if i and i[0] != '#']
  56. ts_list = []
  57. for ts_url in m3u8_list:
  58. ts_url = url.rsplit('/', 1)[0] + '/' + ts_url
  59. ts_list.append(ts_url)
  60. with open(file_path, 'wb') as f:
  61. for ts_url in ts_list:
  62. r = requests.get(ts_url)
  63. if r.status_code == 200:
  64. f.write(r.content)
  65. return True
  66. @classmethod
  67. def convert_ts_to_mp4(cls, ts_file_path, mp4_file_path):
  68. os.system(f'ffmpeg -i {ts_file_path} -c copy {mp4_file_path}')