download_video.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import re
  6. import html
  7. import cffi
  8. import traceback
  9. import requests
  10. from uuid import uuid4
  11. from fake_useragent import FakeUserAgent
  12. from applications.utils.common import str_to_md5
  13. from config import decrypt_key_path
  14. headers = {"Content-Type": "application/json", "User-Agent": FakeUserAgent().chrome}
  15. def extract_video_url_from_article(article_url):
  16. """
  17. :param article_url:
  18. :return:
  19. """
  20. response = requests.get(
  21. url=article_url,
  22. headers={"User-Agent": FakeUserAgent().random},
  23. )
  24. html_text = response.text
  25. w = re.search(
  26. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  27. ).group(1)
  28. url = html.unescape(
  29. re.sub(
  30. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  31. )
  32. )
  33. return url
  34. def download_gzh_video(article_url):
  35. """
  36. 下载公众号视频
  37. :param article_url:
  38. :return:
  39. """
  40. try:
  41. video_url = extract_video_url_from_article(article_url)
  42. except Exception as e:
  43. return
  44. save_path = "static/{}.mp4".format(str_to_md5(video_url))
  45. headers = {
  46. "Accept": "*/*",
  47. "Accept-Language": "zh,zh-CN;q=0.9",
  48. "Connection": "keep-alive",
  49. "Origin": "https://mp.weixin.qq.com",
  50. "Referer": "https://mp.weixin.qq.com/",
  51. "Sec-Fetch-Dest": "video",
  52. "Sec-Fetch-Mode": "cors",
  53. "Sec-Fetch-Site": "cross-site",
  54. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
  55. "sec-ch-ua": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
  56. "sec-ch-ua-mobile": "?0",
  57. "sec-ch-ua-platform": '"macOS"',
  58. }
  59. res = requests.get(video_url, headers=headers)
  60. with open(save_path, "wb") as f:
  61. f.write(res.content)
  62. TEN_KB = 1024 * 10
  63. if os.path.getsize(save_path) > TEN_KB:
  64. return save_path
  65. else:
  66. return None
  67. def download_sph_video(download_url, key):
  68. """
  69. download video, decrypt video and save to local
  70. """
  71. file_id = uuid4().hex
  72. encrypted_path = f"static/encrypted_{file_id}.mp4"
  73. decrypted_path = f"static/decrypted_{file_id}.mp4"
  74. try:
  75. with requests.get(download_url, headers=headers, stream=True) as response:
  76. response.raise_for_status()
  77. with open(encrypted_path, "wb") as f:
  78. for chunk in response.iter_content(chunk_size=8192):
  79. if chunk: # filter out keep-alive chunks
  80. f.write(chunk)
  81. decrypt_sph_video(encrypted_path, key, decrypted_path)
  82. os.remove(encrypted_path)
  83. return decrypted_path
  84. except Exception as e:
  85. print(traceback.format_exc())
  86. for path in [encrypted_path, decrypted_path]:
  87. if os.path.exists(path):
  88. try:
  89. os.remove(path)
  90. except OSError:
  91. pass
  92. raise RuntimeError(f"Video processing failed: {str(e)}") from e
  93. def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
  94. """
  95. Decrypt video file using C library.
  96. Args:
  97. video_path: Path to encrypted video file
  98. key: 32-bit unsigned integer decryption key
  99. save_path: Path to save decrypted video
  100. Raises:
  101. RuntimeError: If decryption fails
  102. """
  103. print("key is {}".format(key))
  104. ffi = cffi.FFI()
  105. try:
  106. lib = ffi.dlopen(decrypt_key_path)
  107. ffi.cdef(
  108. "void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);"
  109. )
  110. with open(video_path, "rb") as f:
  111. encrypted_data = f.read()
  112. c_data = ffi.new("unsigned char[]", list(encrypted_data))
  113. lib.decrypt(c_data, 2**17, int(key))
  114. decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
  115. with open(save_path, "wb") as f:
  116. f.write(decrypted_data)
  117. except Exception as e:
  118. print(traceback.format_exc())
  119. raise RuntimeError(f"Decryption failed: {str(e)}") from e
  120. def download_toutiao_video(video_url: str) -> str:
  121. """
  122. download toutiao video
  123. """
  124. save_path = "static/{}.mp4".format(str_to_md5(video_url))
  125. response = requests.get(video_url, headers=headers, stream=True)
  126. with open(save_path, "wb") as f:
  127. for chunk in response.iter_content(chunk_size=8192):
  128. if chunk:
  129. f.write(chunk)
  130. return save_path