download_video.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import re
  6. import html
  7. import cffi
  8. import traceback
  9. import requests
  10. from uuid import uuid4
  11. from fake_useragent import FakeUserAgent
  12. from applications.utils.common import str_to_md5
  13. from config import decrypt_key_path
  14. headers = {
  15. 'Content-Type': 'application/json',
  16. 'User-Agent': FakeUserAgent().chrome
  17. }
  18. def extract_video_url_from_article(article_url):
  19. """
  20. :param article_url:
  21. :return:
  22. """
  23. response = requests.get(
  24. url=article_url,
  25. headers={'User-Agent': FakeUserAgent().random},
  26. )
  27. html_text = response.text
  28. w = re.search(
  29. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  30. ).group(1)
  31. url = html.unescape(
  32. re.sub(
  33. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  34. )
  35. )
  36. return url
  37. def download_gzh_video(article_url):
  38. """
  39. 下载公众号视频
  40. :param article_url:
  41. :return:
  42. """
  43. try:
  44. video_url = extract_video_url_from_article(article_url)
  45. except Exception as e:
  46. return
  47. save_path = "static/{}.mp4".format(str_to_md5(video_url))
  48. headers = {
  49. 'Accept': '*/*',
  50. 'Accept-Language': 'zh,zh-CN;q=0.9',
  51. 'Connection': 'keep-alive',
  52. 'Origin': 'https://mp.weixin.qq.com',
  53. 'Referer': 'https://mp.weixin.qq.com/',
  54. 'Sec-Fetch-Dest': 'video',
  55. 'Sec-Fetch-Mode': 'cors',
  56. 'Sec-Fetch-Site': 'cross-site',
  57. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
  58. 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
  59. 'sec-ch-ua-mobile': '?0',
  60. 'sec-ch-ua-platform': '"macOS"'
  61. }
  62. res = requests.get(video_url, headers=headers)
  63. with open(save_path, "wb") as f:
  64. f.write(res.content)
  65. TEN_KB = 1024 * 10
  66. if os.path.getsize(save_path) > TEN_KB:
  67. return save_path
  68. else:
  69. return None
  70. def download_sph_video(download_url, key):
  71. """
  72. download video, decrypt video and save to local
  73. """
  74. print("downloading video from {}".format(download_url))
  75. print("key is {}".format(key))
  76. file_id = uuid4().hex
  77. encrypted_path = f"static/encrypted_{file_id}.mp4"
  78. decrypted_path = f"static/decrypted_{file_id}.mp4"
  79. try:
  80. with requests.get(download_url, headers=headers, stream=True) as response:
  81. response.raise_for_status()
  82. with open(encrypted_path, 'wb') as f:
  83. for chunk in response.iter_content(chunk_size=8192):
  84. if chunk: # filter out keep-alive chunks
  85. f.write(chunk)
  86. decrypt_sph_video(encrypted_path, key, decrypted_path)
  87. os.remove(encrypted_path)
  88. return decrypted_path
  89. except Exception as e:
  90. print(traceback.format_exc())
  91. for path in [encrypted_path, decrypted_path]:
  92. if os.path.exists(path):
  93. try:
  94. os.remove(path)
  95. except OSError:
  96. pass
  97. raise RuntimeError(f"Video processing failed: {str(e)}") from e
  98. def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
  99. """
  100. Decrypt video file using C library.
  101. Args:
  102. video_path: Path to encrypted video file
  103. key: 32-bit unsigned integer decryption key
  104. save_path: Path to save decrypted video
  105. Raises:
  106. RuntimeError: If decryption fails
  107. """
  108. print("key is {}".format(key))
  109. ffi = cffi.FFI()
  110. try:
  111. lib = ffi.dlopen(decrypt_key_path)
  112. ffi.cdef('void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);')
  113. with open(video_path, 'rb') as f:
  114. encrypted_data = f.read()
  115. c_data = ffi.new('unsigned char[]', list(encrypted_data))
  116. lib.decrypt(c_data, 2 ** 17, int(key))
  117. decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
  118. with open(save_path, 'wb') as f:
  119. f.write(decrypted_data)
  120. except Exception as e:
  121. print(traceback.format_exc())
  122. raise RuntimeError(f"Decryption failed: {str(e)}") from e