download_video.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import re
  6. import html
  7. import cffi
  8. import requests
  9. from uuid import uuid4
  10. from fake_useragent import FakeUserAgent
  11. from applications.utils.common import str_to_md5
  12. from config import decrypt_key_path
  13. headers = {
  14. 'Content-Type': 'application/json',
  15. 'User-Agent': FakeUserAgent().chrome
  16. }
  17. def extract_video_url_from_article(article_url):
  18. """
  19. :param article_url:
  20. :return:
  21. """
  22. response = requests.get(
  23. url=article_url,
  24. headers={'User-Agent': FakeUserAgent().random},
  25. )
  26. html_text = response.text
  27. w = re.search(
  28. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  29. ).group(1)
  30. url = html.unescape(
  31. re.sub(
  32. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  33. )
  34. )
  35. return url
  36. def download_gzh_video(article_url):
  37. """
  38. 下载公众号视频
  39. :param article_url:
  40. :return:
  41. """
  42. try:
  43. video_url = extract_video_url_from_article(article_url)
  44. except Exception as e:
  45. return
  46. save_path = "static/{}.mp4".format(str_to_md5(video_url))
  47. headers = {
  48. 'Accept': '*/*',
  49. 'Accept-Language': 'zh,zh-CN;q=0.9',
  50. 'Connection': 'keep-alive',
  51. 'Origin': 'https://mp.weixin.qq.com',
  52. 'Referer': 'https://mp.weixin.qq.com/',
  53. 'Sec-Fetch-Dest': 'video',
  54. 'Sec-Fetch-Mode': 'cors',
  55. 'Sec-Fetch-Site': 'cross-site',
  56. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
  57. 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
  58. 'sec-ch-ua-mobile': '?0',
  59. 'sec-ch-ua-platform': '"macOS"'
  60. }
  61. res = requests.get(video_url, headers=headers)
  62. with open(save_path, "wb") as f:
  63. f.write(res.content)
  64. TEN_KB = 1024 * 10
  65. if os.path.getsize(save_path) > TEN_KB:
  66. return save_path
  67. else:
  68. return None
  69. def download_sph_video(download_url, key):
  70. """
  71. download video, decrypt video and save to local
  72. """
  73. print("downloading video from {}".format(download_url))
  74. print("key is {}".format(key))
  75. file_id = uuid4().hex
  76. encrypted_path = f"static/encrypted_{file_id}.mp4"
  77. decrypted_path = f"static/decrypted_{file_id}.mp4"
  78. try:
  79. with requests.get(download_url, headers=headers, stream=True) as response:
  80. response.raise_for_status()
  81. with open(encrypted_path, 'wb') as f:
  82. for chunk in response.iter_content(chunk_size=8192):
  83. if chunk: # filter out keep-alive chunks
  84. f.write(chunk)
  85. decrypt_sph_video(encrypted_path, key, decrypted_path)
  86. os.remove(encrypted_path)
  87. return decrypted_path
  88. except Exception as e:
  89. for path in [encrypted_path, decrypted_path]:
  90. if os.path.exists(path):
  91. try:
  92. os.remove(path)
  93. except OSError:
  94. pass
  95. raise RuntimeError(f"Video processing failed: {str(e)}") from e
  96. def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
  97. """
  98. Decrypt video file using C library.
  99. Args:
  100. video_path: Path to encrypted video file
  101. key: 32-bit unsigned integer decryption key
  102. save_path: Path to save decrypted video
  103. Raises:
  104. RuntimeError: If decryption fails
  105. """
  106. print("key is {}".format(key))
  107. ffi = cffi.FFI()
  108. ffi.cdef('void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);')
  109. try:
  110. lib = ffi.dlopen(decrypt_key_path)
  111. with open(video_path, 'rb') as f:
  112. encrypted_data = f.read()
  113. c_data = ffi.new('unsigned char[]', list(encrypted_data))
  114. lib.decrypt(c_data, 2 ** 17, key)
  115. decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
  116. with open(save_path, 'wb') as f:
  117. f.write(decrypted_data)
  118. except Exception as e:
  119. raise RuntimeError(f"Decryption failed: {str(e)}") from e