123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 缓存管理工具
- 用于管理视频识别过程中的临时缓存文件
- """
- import os
- import time
- import argparse
- from pathlib import Path
- # 缓存目录配置
- CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
- CACHE_MAX_AGE = 3600 # 1小时
- def get_cache_info():
- """获取缓存信息"""
- if not os.path.exists(CACHE_DIR):
- return {
- 'exists': False,
- 'total_files': 0,
- 'total_size_mb': 0,
- 'oldest_file': None,
- 'newest_file': None
- }
-
- files = []
- total_size = 0
-
- for filename in os.listdir(CACHE_DIR):
- file_path = os.path.join(CACHE_DIR, filename)
- if os.path.isfile(file_path):
- stat = os.stat(file_path)
- files.append({
- 'name': filename,
- 'size': stat.st_size,
- 'mtime': stat.st_mtime,
- 'age': time.time() - stat.st_mtime
- })
- total_size += stat.st_size
-
- if not files:
- return {
- 'exists': True,
- 'total_files': 0,
- 'total_size_mb': 0,
- 'oldest_file': None,
- 'newest_file': None
- }
-
- # 按修改时间排序
- files.sort(key=lambda x: x['mtime'])
-
- return {
- 'exists': True,
- 'total_files': len(files),
- 'total_size_mb': total_size / (1024 * 1024),
- 'oldest_file': {
- 'name': files[0]['name'],
- 'age_hours': files[0]['age'] / 3600
- },
- 'newest_file': {
- 'name': files[-1]['name'],
- 'age_hours': files[-1]['age'] / 3600
- }
- }
- def cleanup_cache(force=False, dry_run=False):
- """清理缓存文件"""
- if not os.path.exists(CACHE_DIR):
- print("缓存目录不存在")
- return 0
-
- current_time = time.time()
- cleaned_count = 0
- cleaned_size = 0
-
- for filename in os.listdir(CACHE_DIR):
- file_path = os.path.join(CACHE_DIR, filename)
- if os.path.isfile(file_path):
- file_age = current_time - os.path.getmtime(file_path)
- should_clean = force or file_age > CACHE_MAX_AGE
-
- if should_clean:
- file_size = os.path.getsize(file_path)
- if dry_run:
- print(f"将要清理: {filename} (大小: {file_size/1024:.1f}KB, 年龄: {file_age/3600:.1f}小时)")
- else:
- try:
- os.remove(file_path)
- cleaned_count += 1
- cleaned_size += file_size
- print(f"已清理: {filename}")
- except Exception as e:
- print(f"清理失败: {filename}, 错误: {e}")
-
- if not dry_run:
- print(f"清理完成: {cleaned_count} 个文件, 释放空间: {cleaned_size/(1024*1024):.2f}MB")
-
- return cleaned_count
- def show_cache_status():
- """显示缓存状态"""
- info = get_cache_info()
-
- print("=" * 50)
- print("缓存状态信息")
- print("=" * 50)
-
- if not info['exists']:
- print("缓存目录不存在")
- return
-
- print(f"缓存目录: {CACHE_DIR}")
- print(f"文件数量: {info['total_files']}")
- print(f"总大小: {info['total_size_mb']:.2f} MB")
-
- if info['oldest_file']:
- print(f"最旧文件: {info['oldest_file']['name']} ({info['oldest_file']['age_hours']:.1f}小时前)")
-
- if info['newest_file']:
- print(f"最新文件: {info['newest_file']['name']} ({info['newest_file']['age_hours']:.1f}小时前)")
-
- # 显示文件列表
- if info['total_files'] > 0:
- print("\n文件列表:")
- print("-" * 50)
- if os.path.exists(CACHE_DIR):
- for filename in sorted(os.listdir(CACHE_DIR)):
- file_path = os.path.join(CACHE_DIR, filename)
- if os.path.isfile(file_path):
- stat = os.stat(file_path)
- age = time.time() - stat.st_mtime
- size = stat.st_size
- print(f"{filename:<30} {size/1024:>8.1f}KB {age/3600:>6.1f}小时前")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='缓存管理工具')
- parser.add_argument('action', choices=['status', 'clean', 'cleanup'],
- help='操作类型: status(查看状态), clean(清理), cleanup(清理)')
- parser.add_argument('--force', '-f', action='store_true',
- help='强制清理所有文件(不检查年龄)')
- parser.add_argument('--dry-run', '-n', action='store_true',
- help='试运行模式,不实际删除文件')
-
- args = parser.parse_args()
-
- if args.action == 'status':
- show_cache_status()
- elif args.action in ['clean', 'cleanup']:
- if args.dry_run:
- print("试运行模式 - 不会实际删除文件")
- cleanup_cache(force=args.force, dry_run=True)
- else:
- print("开始清理缓存...")
- cleaned = cleanup_cache(force=args.force, dry_run=False)
- if cleaned > 0:
- print(f"清理完成,共清理 {cleaned} 个文件")
- else:
- print("没有需要清理的文件")
- if __name__ == '__main__':
- main()
|