123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- """
- @author: luojunhui
- """
- import os
- import re
- import html
- import cffi
- import traceback
- import requests
- from uuid import uuid4
- from fake_useragent import FakeUserAgent
- from applications.utils.common import str_to_md5
- from config import decrypt_key_path
- headers = {
- 'Content-Type': 'application/json',
- 'User-Agent': FakeUserAgent().chrome
- }
- def extract_video_url_from_article(article_url):
- """
- :param article_url:
- :return:
- """
- response = requests.get(
- url=article_url,
- headers={'User-Agent': FakeUserAgent().random},
- )
- html_text = response.text
- w = re.search(
- r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
- ).group(1)
- url = html.unescape(
- re.sub(
- r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
- )
- )
- return url
- def download_gzh_video(article_url):
- """
- 下载公众号视频
- :param article_url:
- :return:
- """
- try:
- video_url = extract_video_url_from_article(article_url)
- except Exception as e:
- return
- save_path = "static/{}.mp4".format(str_to_md5(video_url))
- headers = {
- 'Accept': '*/*',
- 'Accept-Language': 'zh,zh-CN;q=0.9',
- 'Connection': 'keep-alive',
- 'Origin': 'https://mp.weixin.qq.com',
- 'Referer': 'https://mp.weixin.qq.com/',
- 'Sec-Fetch-Dest': 'video',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'cross-site',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
- 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"'
- }
- res = requests.get(video_url, headers=headers)
- with open(save_path, "wb") as f:
- f.write(res.content)
- TEN_KB = 1024 * 10
- if os.path.getsize(save_path) > TEN_KB:
- return save_path
- else:
- return None
- def download_sph_video(download_url, key):
- """
- download video, decrypt video and save to local
- """
- print("downloading video from {}".format(download_url))
- print("key is {}".format(key))
- file_id = uuid4().hex
- encrypted_path = f"static/encrypted_{file_id}.mp4"
- decrypted_path = f"static/decrypted_{file_id}.mp4"
- try:
- with requests.get(download_url, headers=headers, stream=True) as response:
- response.raise_for_status()
- with open(encrypted_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # filter out keep-alive chunks
- f.write(chunk)
- decrypt_sph_video(encrypted_path, key, decrypted_path)
- os.remove(encrypted_path)
- return decrypted_path
- except Exception as e:
- print(traceback.format_exc())
- for path in [encrypted_path, decrypted_path]:
- if os.path.exists(path):
- try:
- os.remove(path)
- except OSError:
- pass
- raise RuntimeError(f"Video processing failed: {str(e)}") from e
- def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
- """
- Decrypt video file using C library.
- Args:
- video_path: Path to encrypted video file
- key: 32-bit unsigned integer decryption key
- save_path: Path to save decrypted video
- Raises:
- RuntimeError: If decryption fails
- """
- print("key is {}".format(key))
- ffi = cffi.FFI()
- try:
- lib = ffi.dlopen(decrypt_key_path)
- ffi.cdef('void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);')
- with open(video_path, 'rb') as f:
- encrypted_data = f.read()
- c_data = ffi.new('unsigned char[]', list(encrypted_data))
- lib.decrypt(c_data, 2 ** 17, int(key))
- decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
- with open(save_path, 'wb') as f:
- f.write(decrypted_data)
- except Exception as e:
- print(traceback.format_exc())
- raise RuntimeError(f"Decryption failed: {str(e)}") from e
|