#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 缓存管理工具 用于管理视频识别过程中的临时缓存文件 """ import os import time import argparse from pathlib import Path # 缓存目录配置 CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache') CACHE_MAX_AGE = 3600 # 1小时 def get_cache_info(): """获取缓存信息""" if not os.path.exists(CACHE_DIR): return { 'exists': False, 'total_files': 0, 'total_size_mb': 0, 'oldest_file': None, 'newest_file': None } files = [] total_size = 0 for filename in os.listdir(CACHE_DIR): file_path = os.path.join(CACHE_DIR, filename) if os.path.isfile(file_path): stat = os.stat(file_path) files.append({ 'name': filename, 'size': stat.st_size, 'mtime': stat.st_mtime, 'age': time.time() - stat.st_mtime }) total_size += stat.st_size if not files: return { 'exists': True, 'total_files': 0, 'total_size_mb': 0, 'oldest_file': None, 'newest_file': None } # 按修改时间排序 files.sort(key=lambda x: x['mtime']) return { 'exists': True, 'total_files': len(files), 'total_size_mb': total_size / (1024 * 1024), 'oldest_file': { 'name': files[0]['name'], 'age_hours': files[0]['age'] / 3600 }, 'newest_file': { 'name': files[-1]['name'], 'age_hours': files[-1]['age'] / 3600 } } def cleanup_cache(force=False, dry_run=False): """清理缓存文件""" if not os.path.exists(CACHE_DIR): print("缓存目录不存在") return 0 current_time = time.time() cleaned_count = 0 cleaned_size = 0 for filename in os.listdir(CACHE_DIR): file_path = os.path.join(CACHE_DIR, filename) if os.path.isfile(file_path): file_age = current_time - os.path.getmtime(file_path) should_clean = force or file_age > CACHE_MAX_AGE if should_clean: file_size = os.path.getsize(file_path) if dry_run: print(f"将要清理: {filename} (大小: {file_size/1024:.1f}KB, 年龄: {file_age/3600:.1f}小时)") else: try: os.remove(file_path) cleaned_count += 1 cleaned_size += file_size print(f"已清理: {filename}") except Exception as e: print(f"清理失败: {filename}, 错误: {e}") if not dry_run: print(f"清理完成: {cleaned_count} 个文件, 释放空间: {cleaned_size/(1024*1024):.2f}MB") return cleaned_count def show_cache_status(): """显示缓存状态""" info = get_cache_info() print("=" * 50) print("缓存状态信息") print("=" * 50) if not info['exists']: print("缓存目录不存在") return print(f"缓存目录: {CACHE_DIR}") print(f"文件数量: {info['total_files']}") print(f"总大小: {info['total_size_mb']:.2f} MB") if info['oldest_file']: print(f"最旧文件: {info['oldest_file']['name']} ({info['oldest_file']['age_hours']:.1f}小时前)") if info['newest_file']: print(f"最新文件: {info['newest_file']['name']} ({info['newest_file']['age_hours']:.1f}小时前)") # 显示文件列表 if info['total_files'] > 0: print("\n文件列表:") print("-" * 50) if os.path.exists(CACHE_DIR): for filename in sorted(os.listdir(CACHE_DIR)): file_path = os.path.join(CACHE_DIR, filename) if os.path.isfile(file_path): stat = os.stat(file_path) age = time.time() - stat.st_mtime size = stat.st_size print(f"{filename:<30} {size/1024:>8.1f}KB {age/3600:>6.1f}小时前") def main(): """主函数""" parser = argparse.ArgumentParser(description='缓存管理工具') parser.add_argument('action', choices=['status', 'clean', 'cleanup'], help='操作类型: status(查看状态), clean(清理), cleanup(清理)') parser.add_argument('--force', '-f', action='store_true', help='强制清理所有文件(不检查年龄)') parser.add_argument('--dry-run', '-n', action='store_true', help='试运行模式,不实际删除文件') args = parser.parse_args() if args.action == 'status': show_cache_status() elif args.action in ['clean', 'cleanup']: if args.dry_run: print("试运行模式 - 不会实际删除文件") cleanup_cache(force=args.force, dry_run=True) else: print("开始清理缓存...") cleaned = cleanup_cache(force=args.force, dry_run=False) if cleaned > 0: print(f"清理完成,共清理 {cleaned} 个文件") else: print("没有需要清理的文件") if __name__ == '__main__': main()