""" 视频处理工具模块 提供视频下载、Base64 编码等功能,用于支持视频评估 """ import asyncio import base64 import hashlib import os from pathlib import Path from typing import Optional import requests # 配置 VIDEO_CACHE_DIR = Path(".video_cache") VIDEO_MAX_SIZE_MB = 50 # 最大视频大小(MB) VIDEO_DOWNLOAD_TIMEOUT = 120 # 下载超时(秒) MAX_RETRIES = 2 # 最大重试次数 async def download_video( video_url: str, cache_dir: Path = VIDEO_CACHE_DIR ) -> Optional[Path]: """ 异步下载视频文件 Args: video_url: 视频URL cache_dir: 缓存目录 Returns: 视频文件路径,失败返回 None """ # 创建缓存目录 cache_dir.mkdir(exist_ok=True) # 生成缓存文件名(基于URL hash) url_hash = hashlib.md5(video_url.encode()).hexdigest() cache_path = cache_dir / f"{url_hash}.mp4" # 检查缓存 if cache_path.exists(): file_size_mb = cache_path.stat().st_size / (1024 * 1024) print(f" ♻️ 使用缓存视频: {file_size_mb:.2f}MB") return cache_path # 异步下载 loop = asyncio.get_event_loop() for attempt in range(MAX_RETRIES + 1): try: print(f" 📥 下载视频... (尝试 {attempt + 1}/{MAX_RETRIES + 1})") # 使用 executor 执行同步下载 response = await loop.run_in_executor( None, lambda: requests.get( video_url, timeout=VIDEO_DOWNLOAD_TIMEOUT, stream=True, headers={"User-Agent": "Mozilla/5.0"} ) ) response.raise_for_status() # 检查文件大小 content_length = response.headers.get('content-length') if content_length: size_mb = int(content_length) / (1024 * 1024) if size_mb > VIDEO_MAX_SIZE_MB: print(f" ⚠️ 视频过大: {size_mb:.2f}MB > {VIDEO_MAX_SIZE_MB}MB") return None # 保存到临时文件 temp_path = cache_path.with_suffix('.tmp') def save_chunks(): with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) await loop.run_in_executor(None, save_chunks) # 检查实际文件大小 actual_size_mb = temp_path.stat().st_size / (1024 * 1024) if actual_size_mb > VIDEO_MAX_SIZE_MB: print(f" ⚠️ 视频过大: {actual_size_mb:.2f}MB > {VIDEO_MAX_SIZE_MB}MB") temp_path.unlink() return None # 重命名为正式文件 temp_path.rename(cache_path) print(f" ✅ 视频下载成功: {actual_size_mb:.2f}MB") return cache_path except Exception as e: if attempt < MAX_RETRIES: wait_time = 2 * (attempt + 1) print(f" ⚠️ 下载失败,{wait_time}秒后重试: {str(e)[:50]}") await asyncio.sleep(wait_time) else: print(f" ❌ 视频下载失败: {str(e)[:100]}") # 清理临时文件 if cache_path.with_suffix('.tmp').exists(): cache_path.with_suffix('.tmp').unlink() return None return None async def encode_video_to_base64(video_path: Path) -> Optional[str]: """ 异步将视频文件编码为 Base64 data URL Args: video_path: 视频文件路径 Returns: Base64 编码的 data URL,失败返回 None """ try: loop = asyncio.get_event_loop() # 异步读取文件 def read_file(): with open(video_path, 'rb') as f: return f.read() print(f" 🔄 编码视频为 Base64...") video_bytes = await loop.run_in_executor(None, read_file) # Base64 编码 def encode(): base64_str = base64.b64encode(video_bytes).decode('utf-8') return f"data:video/mp4;base64,{base64_str}" data_url = await loop.run_in_executor(None, encode) encoded_size_mb = len(data_url) / (1024 * 1024) print(f" ✅ Base64 编码完成: {encoded_size_mb:.2f}MB") return data_url except Exception as e: print(f" ❌ Base64 编码失败: {str(e)[:100]}") return None def cleanup_video_cache(cache_dir: Path = VIDEO_CACHE_DIR, days: int = 7): """ 清理超过指定天数的视频缓存 Args: cache_dir: 缓存目录 days: 保留天数 """ import time if not cache_dir.exists(): return now = time.time() cutoff = now - (days * 24 * 60 * 60) removed_count = 0 for file_path in cache_dir.glob("*.mp4"): if file_path.stat().st_mtime < cutoff: file_path.unlink() removed_count += 1 if removed_count > 0: print(f"🗑️ 清理了 {removed_count} 个过期视频缓存")