""" @author: luojunhui """ import os import re import html import cffi import traceback import requests from uuid import uuid4 from fake_useragent import FakeUserAgent from applications.utils.common import str_to_md5 from config import decrypt_key_path headers = {"Content-Type": "application/json", "User-Agent": FakeUserAgent().chrome} def extract_video_url_from_article(article_url): """ :param article_url: :return: """ response = requests.get( url=article_url, headers={"User-Agent": FakeUserAgent().random}, ) html_text = response.text w = re.search( r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M ).group(1) url = html.unescape( re.sub( r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w ) ) return url def download_gzh_video(article_url): """ 下载公众号视频 :param article_url: :return: """ try: video_url = extract_video_url_from_article(article_url) except Exception as e: return save_path = "static/{}.mp4".format(str_to_md5(video_url)) headers = { "Accept": "*/*", "Accept-Language": "zh,zh-CN;q=0.9", "Connection": "keep-alive", "Origin": "https://mp.weixin.qq.com", "Referer": "https://mp.weixin.qq.com/", "Sec-Fetch-Dest": "video", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", "sec-ch-ua": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', } res = requests.get(video_url, headers=headers) with open(save_path, "wb") as f: f.write(res.content) TEN_KB = 1024 * 10 if os.path.getsize(save_path) > TEN_KB: return save_path else: return None def download_sph_video(download_url, key): """ download video, decrypt video and save to local """ file_id = uuid4().hex encrypted_path = f"static/encrypted_{file_id}.mp4" decrypted_path = f"static/decrypted_{file_id}.mp4" try: with requests.get(download_url, headers=headers, stream=True) as response: response.raise_for_status() with open(encrypted_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: # filter out keep-alive chunks f.write(chunk) decrypt_sph_video(encrypted_path, key, decrypted_path) os.remove(encrypted_path) return decrypted_path except Exception as e: print(traceback.format_exc()) for path in [encrypted_path, decrypted_path]: if os.path.exists(path): try: os.remove(path) except OSError: pass raise RuntimeError(f"Video processing failed: {str(e)}") from e def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None: """ Decrypt video file using C library. Args: video_path: Path to encrypted video file key: 32-bit unsigned integer decryption key save_path: Path to save decrypted video Raises: RuntimeError: If decryption fails """ print("key is {}".format(key)) ffi = cffi.FFI() try: lib = ffi.dlopen(decrypt_key_path) ffi.cdef( "void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);" ) with open(video_path, "rb") as f: encrypted_data = f.read() c_data = ffi.new("unsigned char[]", list(encrypted_data)) lib.decrypt(c_data, 2**17, int(key)) decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:]) with open(save_path, "wb") as f: f.write(decrypted_data) except Exception as e: print(traceback.format_exc()) raise RuntimeError(f"Decryption failed: {str(e)}") from e def download_toutiao_video(video_url: str) -> str: """ download toutiao video """ save_path = "static/{}.mp4".format(str_to_md5(video_url)) response = requests.get(video_url, headers=headers, stream=True) with open(save_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return save_path