|
@@ -0,0 +1,138 @@
|
|
|
+"""
|
|
|
+@author: luojunhui
|
|
|
+"""
|
|
|
+import os
|
|
|
+import re
|
|
|
+import html
|
|
|
+import cffi
|
|
|
+
|
|
|
+import requests
|
|
|
+from uuid import uuid4
|
|
|
+from fake_useragent import FakeUserAgent
|
|
|
+
|
|
|
+from applications.utils.common import str_to_md5
|
|
|
+from config import decrypt_key_path
|
|
|
+
|
|
|
+headers = {
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
+ 'User-Agent': FakeUserAgent().chrome
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def extract_video_url_from_article(article_url):
|
|
|
+ """
|
|
|
+ :param article_url:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ response = requests.get(
|
|
|
+ url=article_url,
|
|
|
+ headers={'User-Agent': FakeUserAgent().random},
|
|
|
+ )
|
|
|
+ html_text = response.text
|
|
|
+ w = re.search(
|
|
|
+ r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
|
|
|
+ ).group(1)
|
|
|
+ url = html.unescape(
|
|
|
+ re.sub(
|
|
|
+ r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
|
|
|
+ )
|
|
|
+ )
|
|
|
+ return url
|
|
|
+
|
|
|
+
|
|
|
+def download_gzh_video(article_url):
|
|
|
+ """
|
|
|
+ 下载公众号视频
|
|
|
+ :param article_url:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ video_url = extract_video_url_from_article(article_url)
|
|
|
+ except Exception as e:
|
|
|
+ return
|
|
|
+ save_path = "static/{}.mp4".format(str_to_md5(video_url))
|
|
|
+ headers = {
|
|
|
+ 'Accept': '*/*',
|
|
|
+ 'Accept-Language': 'zh,zh-CN;q=0.9',
|
|
|
+ 'Connection': 'keep-alive',
|
|
|
+ 'Origin': 'https://mp.weixin.qq.com',
|
|
|
+ 'Referer': 'https://mp.weixin.qq.com/',
|
|
|
+ 'Sec-Fetch-Dest': 'video',
|
|
|
+ 'Sec-Fetch-Mode': 'cors',
|
|
|
+ 'Sec-Fetch-Site': 'cross-site',
|
|
|
+ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
|
|
|
+ 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
|
|
|
+ 'sec-ch-ua-mobile': '?0',
|
|
|
+ 'sec-ch-ua-platform': '"macOS"'
|
|
|
+ }
|
|
|
+ res = requests.get(video_url, headers=headers)
|
|
|
+ with open(save_path, "wb") as f:
|
|
|
+ f.write(res.content)
|
|
|
+
|
|
|
+ TEN_KB = 1024 * 10
|
|
|
+ if os.path.getsize(save_path) > TEN_KB:
|
|
|
+ return save_path
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def download_sph_video(download_url, key):
|
|
|
+ """
|
|
|
+ download video, decrypt video and save to local
|
|
|
+ """
|
|
|
+ file_id = uuid4().hex
|
|
|
+ encrypted_path = f"static/encrypted_{file_id}.mp4"
|
|
|
+ decrypted_path = f"static/decrypted_{file_id}.mp4"
|
|
|
+
|
|
|
+ try:
|
|
|
+ with requests.get(download_url, headers=headers, stream=True) as response:
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ with open(encrypted_path, 'wb') as f:
|
|
|
+ for chunk in response.iter_content(chunk_size=8192):
|
|
|
+ if chunk: # filter out keep-alive chunks
|
|
|
+ f.write(chunk)
|
|
|
+
|
|
|
+ decrypt_sph_video(encrypted_path, key, decrypted_path)
|
|
|
+ os.remove(encrypted_path)
|
|
|
+ return decrypted_path
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ for path in [encrypted_path, decrypted_path]:
|
|
|
+ if os.path.exists(path):
|
|
|
+ try:
|
|
|
+ os.remove(path)
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
+ raise RuntimeError(f"Video processing failed: {str(e)}") from e
|
|
|
+
|
|
|
+
|
|
|
+def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
|
|
|
+ """
|
|
|
+ Decrypt video file using C library.
|
|
|
+ Args:
|
|
|
+ video_path: Path to encrypted video file
|
|
|
+ key: 32-bit unsigned integer decryption key
|
|
|
+ save_path: Path to save decrypted video
|
|
|
+ Raises:
|
|
|
+ RuntimeError: If decryption fails
|
|
|
+ """
|
|
|
+ ffi = cffi.FFI()
|
|
|
+ ffi.cdef('void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);')
|
|
|
+
|
|
|
+ try:
|
|
|
+ lib = ffi.dlopen(decrypt_key_path)
|
|
|
+
|
|
|
+ with open(video_path, 'rb') as f:
|
|
|
+ encrypted_data = f.read()
|
|
|
+
|
|
|
+ c_data = ffi.new('unsigned char[]', list(encrypted_data))
|
|
|
+ lib.decrypt(c_data, 2 ** 17, key)
|
|
|
+ decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
|
|
|
+
|
|
|
+ with open(save_path, 'wb') as f:
|
|
|
+ f.write(decrypted_data)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ raise RuntimeError(f"Decryption failed: {str(e)}") from e
|
|
|
+
|