download_video.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import os
  2. import time
  3. import uuid
  4. import requests
  5. class DownLoad:
  6. @classmethod
  7. def download_video(cls, video_url, video_path_url, tag_transport_channel, video_id):
  8. video = video_path_url + 'video.mp4'
  9. if tag_transport_channel == "抖音":
  10. headers = {
  11. 'accept': '*/*',
  12. 'accept-encoding': 'identity;q=1, *;q=0',
  13. 'accept-language': 'zh-CN,zh;q=0.9',
  14. 'cache-control': 'no-cache',
  15. 'connection': 'keep-alive',
  16. 'pragma': 'no-cache',
  17. 'range': 'bytes=0-',
  18. 'referer': f'https://www.douyin.com/video/{video_id}',
  19. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
  20. }
  21. payload = {}
  22. for i in range(3):
  23. try:
  24. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=120)
  25. if response.status_code == 206:
  26. # 以二进制写入模式打开文件
  27. with open(f"{video}", "wb") as file:
  28. # 将响应内容写入文件
  29. file.write(response.content)
  30. return video
  31. except Exception:
  32. return video
  33. else:
  34. try:
  35. for i in range(3):
  36. payload = {}
  37. headers = {}
  38. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=240)
  39. if response.status_code == 200:
  40. # 以二进制写入模式打开文件
  41. with open(f"{video}", "wb") as file:
  42. # 将响应内容写入文件
  43. file.write(response.content)
  44. return video
  45. return video
  46. except Exception:
  47. return video
  48. @classmethod
  49. def download_m3u8_video(cls ,url, file_path):
  50. r = requests.get(url)
  51. if r.status_code != 200:
  52. return False
  53. m3u8_list = r.text.split('\n')
  54. m3u8_list = [i for i in m3u8_list if i and i[0] != '#']
  55. ts_list = []
  56. for ts_url in m3u8_list:
  57. ts_url = url.rsplit('/', 1)[0] + '/' + ts_url
  58. ts_list.append(ts_url)
  59. with open(file_path, 'wb') as f:
  60. for ts_url in ts_list:
  61. r = requests.get(ts_url)
  62. if r.status_code == 200:
  63. f.write(r.content)
  64. return True
  65. @classmethod
  66. def convert_ts_to_mp4(cls, ts_file_path, mp4_file_path):
  67. os.system(f'ffmpeg -i {ts_file_path} -c copy {mp4_file_path}')
  68. @classmethod
  69. def download_pq_video(cls,video_path_url , video_url_list):
  70. video_list = []
  71. for video_url in video_url_list:
  72. video = f'{video_path_url}{str(uuid.uuid4())}.mp4'
  73. try:
  74. payload = {}
  75. headers = {}
  76. response = requests.request("GET", video_url, headers=headers, data=payload, timeout=60)
  77. if response.status_code == 200:
  78. # 以二进制写入模式打开文件
  79. with open(f"{video}", "wb") as file:
  80. # 将响应内容写入文件
  81. file.write(response.content)
  82. video_list.append(video)
  83. time.sleep(1)
  84. except Exception:
  85. continue
  86. return video_list