123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- """
- @author: luojunhui
- """
- import os
- import re
- import html
- import cffi
- import traceback
- import requests
- from uuid import uuid4
- from fake_useragent import FakeUserAgent
- from applications.utils.common import str_to_md5
- from config import decrypt_key_path
- headers = {"Content-Type": "application/json", "User-Agent": FakeUserAgent().chrome}
- def extract_video_url_from_article(article_url):
- """
- :param article_url:
- :return:
- """
- response = requests.get(
- url=article_url,
- headers={"User-Agent": FakeUserAgent().random},
- )
- html_text = response.text
- w = re.search(
- r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
- ).group(1)
- url = html.unescape(
- re.sub(
- r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
- )
- )
- return url
- def download_gzh_video(article_url):
- """
- 下载公众号视频
- :param article_url:
- :return:
- """
- try:
- video_url = extract_video_url_from_article(article_url)
- except Exception as e:
- return
- save_path = "static/{}.mp4".format(str_to_md5(video_url))
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh,zh-CN;q=0.9",
- "Connection": "keep-alive",
- "Origin": "https://mp.weixin.qq.com",
- "Referer": "https://mp.weixin.qq.com/",
- "Sec-Fetch-Dest": "video",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "cross-site",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
- "sec-ch-ua": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"macOS"',
- }
- res = requests.get(video_url, headers=headers)
- with open(save_path, "wb") as f:
- f.write(res.content)
- TEN_KB = 1024 * 10
- if os.path.getsize(save_path) > TEN_KB:
- return save_path
- else:
- return None
- def download_sph_video(download_url, key):
- """
- download video, decrypt video and save to local
- """
- file_id = uuid4().hex
- encrypted_path = f"static/encrypted_{file_id}.mp4"
- decrypted_path = f"static/decrypted_{file_id}.mp4"
- try:
- with requests.get(download_url, headers=headers, stream=True) as response:
- response.raise_for_status()
- with open(encrypted_path, "wb") as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # filter out keep-alive chunks
- f.write(chunk)
- decrypt_sph_video(encrypted_path, key, decrypted_path)
- os.remove(encrypted_path)
- return decrypted_path
- except Exception as e:
- print(traceback.format_exc())
- for path in [encrypted_path, decrypted_path]:
- if os.path.exists(path):
- try:
- os.remove(path)
- except OSError:
- pass
- raise RuntimeError(f"Video processing failed: {str(e)}") from e
- def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
- """
- Decrypt video file using C library.
- Args:
- video_path: Path to encrypted video file
- key: 32-bit unsigned integer decryption key
- save_path: Path to save decrypted video
- Raises:
- RuntimeError: If decryption fails
- """
- print("key is {}".format(key))
- ffi = cffi.FFI()
- try:
- lib = ffi.dlopen(decrypt_key_path)
- ffi.cdef(
- "void decrypt(unsigned char *data, const size_t data_length, const uint32_t key);"
- )
- with open(video_path, "rb") as f:
- encrypted_data = f.read()
- c_data = ffi.new("unsigned char[]", list(encrypted_data))
- lib.decrypt(c_data, 2**17, int(key))
- decrypted_data = bytes(ffi.buffer(c_data, len(encrypted_data))[:])
- with open(save_path, "wb") as f:
- f.write(decrypted_data)
- except Exception as e:
- print(traceback.format_exc())
- raise RuntimeError(f"Decryption failed: {str(e)}") from e
- def download_toutiao_video(video_url: str) -> str:
- """
- download toutiao video
- """
- save_path = "static/{}.mp4".format(str_to_md5(video_url))
- response = requests.get(video_url, headers=headers, stream=True)
- with open(save_path, "wb") as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
- return save_path
|