""" @author: luojunhui """ import os import re import html import cffi import traceback import requests from uuid import uuid4 from fake_useragent import FakeUserAgent from applications.utils.common import str_to_md5 from config import decrypt_key_path headers = { 'Content-Type': 'application/json', 'User-Agent': FakeUserAgent().chrome } def extract_video_url_from_article(article_url): """ :param article_url: :return: """ response = requests.get( url=article_url, headers={'User-Agent': FakeUserAgent().random}, ) html_text = response.text w = re.search( r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M ).group(1) url = html.unescape( re.sub( r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w ) ) return url def download_gzh_video(article_url): """ 下载公众号视频 :param article_url: :return: """ try: video_url = extract_video_url_from_article(article_url) except Exception as e: return save_path = "static/{}.mp4".format(str_to_md5(video_url)) headers = { 'Accept': '*/*', 'Accept-Language': 'zh,zh-CN;q=0.9', 'Connection': 'keep-alive', 'Origin': 'https://mp.weixin.qq.com', 'Referer': 'https://mp.weixin.qq.com/', 'Sec-Fetch-Dest': 'video', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36', 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } res = requests.get(video_url, headers=headers) with open(save_path, "wb") as f: f.write(res.content) TEN_KB = 1024 * 10 if os.path.getsize(save_path) > TEN_KB: return save_path else: return None def download_sph_video(download_url, key): """ download video, decrypt video and save to local """ print("downloading video from {}".format(download_url)) print("key is {}".format(key)) file_id = uuid4().hex encrypted_path = f"static/encrypted_{file_id}.mp4" decrypted_path = f"static/decrypted_{file_id}.mp4" try: with requests.get(download_url, headers=headers, stream=True) as response: response.raise_for_status() with open(encrypted_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: # filter out keep-alive chunks f.write(chunk) decrypt_sph_video(encrypted_path, key, decrypted_path) os.remove(encrypted_path) return decrypted_path except Exception as e: print(traceback.format_exc()) for path in [encrypted_path, decrypted_path]: if os.path.exists(path): try: os.remove(path) except OSError: pass raise RuntimeError(f"Video processing failed: {str(e)}") from e def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None: """ Decrypt video file using C library. Args: video_path: Path to encrypted video file key: 32-bit unsigned integer decryption key save_path: Path to save decrypted video Raises: RuntimeError: If decryption fails """ print("key is {}".format(key)) ffi = cffi.FFI() try: lib = ffi.dlopen(decrypt_key_path) ffi.cdef('void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);') with open(video_path, 'rb') as f: encrypted_data = f.read() c_data = ffi.new('unsigned char[]', list(encrypted_data)) lib.decrypt(c_data, 2 ** 17, int(key)) decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:]) with open(save_path, 'wb') as f: f.write(decrypted_data) except Exception as e: print(traceback.format_exc()) raise RuntimeError(f"Decryption failed: {str(e)}") from e