download_video.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import os
  2. import time
  3. import uuid
  4. from urllib.parse import urlparse
  5. import requests
  6. class DownLoad:
  7. @classmethod
  8. def download_video(cls, video_url, video_path_url, tag_transport_channel, video_id):
  9. video = video_path_url + 'video.mp4'
  10. if tag_transport_channel == "抖音":
  11. headers = {
  12. 'accept': '*/*',
  13. 'accept-encoding': 'identity;q=1, *;q=0',
  14. 'accept-language': 'zh-CN,zh;q=0.9',
  15. 'cache-control': 'no-cache',
  16. 'connection': 'keep-alive',
  17. 'pragma': 'no-cache',
  18. 'range': 'bytes=0-',
  19. 'referer': f'https://www.douyin.com/video/{video_id}',
  20. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
  21. }
  22. payload = {}
  23. for i in range(3):
  24. try:
  25. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=120)
  26. if response.status_code == 206:
  27. # 以二进制写入模式打开文件
  28. with open(f"{video}", "wb") as file:
  29. # 将响应内容写入文件
  30. file.write(response.content)
  31. return video
  32. except Exception:
  33. return video
  34. else:
  35. try:
  36. for i in range(3):
  37. payload = {}
  38. headers = {}
  39. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
  40. if response.status_code == 200:
  41. # 以二进制写入模式打开文件
  42. with open(f"{video}", "wb") as file:
  43. # 将响应内容写入文件
  44. file.write(response.content)
  45. return video
  46. return video
  47. except Exception:
  48. return video
  49. @classmethod
  50. def download_m3u8_video(cls ,url, file_path):
  51. r = requests.get(url)
  52. if r.status_code != 200:
  53. return False
  54. m3u8_list = r.text.split('\n')
  55. m3u8_list = [i for i in m3u8_list if i and i[0] != '#']
  56. ts_list = []
  57. for ts_url in m3u8_list:
  58. ts_url = url.rsplit('/', 1)[0] + '/' + ts_url
  59. ts_list.append(ts_url)
  60. with open(file_path, 'wb') as f:
  61. for ts_url in ts_list:
  62. r = requests.get(ts_url)
  63. if r.status_code == 200:
  64. f.write(r.content)
  65. return True
  66. @classmethod
  67. def convert_ts_to_mp4(cls, ts_file_path, mp4_file_path):
  68. os.system(f'ffmpeg -i {ts_file_path} -c copy {mp4_file_path}')
  69. @classmethod
  70. def download_pq_video(cls,video_path_url , video_url_list):
  71. video_list = []
  72. for video_url in video_url_list:
  73. video = f'{video_path_url}{str(uuid.uuid4())}.mp4'
  74. try:
  75. payload = {}
  76. headers = {}
  77. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=60)
  78. if response.status_code == 200:
  79. # 以二进制写入模式打开文件
  80. with open(f"{video}", "wb") as file:
  81. # 将响应内容写入文件
  82. file.write(response.content)
  83. video_list.append(video)
  84. time.sleep(1)
  85. except Exception:
  86. continue
  87. return video_list