Pārlūkot izejas kodu

异常退出问题解决

jihuaqiang 4 stundas atpakaļ
vecāks
revīzija
e302331ba8

+ 156 - 148
content_indentify/README.md

@@ -1,220 +1,228 @@
-# 多线程内容识别调度器
+# 内容识别调度器
 
-这是一个多线程定时任务调度器,用于自动处理内容识别任务。在服务器上运行时,所有日志都会保存到文件中,方便查看和调试
+这是一个多线程内容识别调度器,用于处理图片和视频内容识别任务
 
-## 功能特
+## 功能特
 
-- 🚀 多线程并发处理,提高处理效率
-- 📝 完整的日志记录,支持文件和控制台输出
-- 🕐 定时任务调度,可配置处理间隔
-- 🛑 优雅退出,支持信号处理
-- 🔧 进程管理,支持启动、停止、重启、状态查看
+- 多线程并发处理(默认5个线程)
+- 定时任务调度(默认每2分钟处理一条数据)
+- 自动错误恢复和进程监控
+- 内存使用监控和垃圾回收
+- 支持图片识别(Coze API)和视频识别(Gemini API)
 
-## 文件说明
+## 安装依赖
 
-- `multi_thread_scheduler.py` - 多线程调度器主程序
-- `indentify.py` - 内容识别处理模块
-- `logging_config.py` - 日志配置模块
-- `start_scheduler.sh` - 启动脚本(支持多种操作)
-- `logs/` - 日志文件目录
+```bash
+# 安装Python依赖
+pip install -r requirements.txt
+
+# 或者使用uv(推荐)
+uv sync
+```
 
 ## 使用方法
 
-### 1. 启动调度器
+### 1. 基本启动
 
 ```bash
-# 启动调度器(后台运行)
+# 启动调度器
 ./start_scheduler.sh start
 
-# 或者直接运行Python脚本(前台运行)
-python3 multi_thread_scheduler.py
-```
-
-### 2. 查看运行状态
-
-```bash
-# 查看调度器状态
+# 查看状态
 ./start_scheduler.sh status
 
-# 查看实时日志
-tail -f logs/scheduler_*.log
-
-# 查看内容识别日志
-tail -f logs/content_identifier_*.log
-```
-
-### 3. 停止调度器
-
-```bash
-# 优雅停止调度器
+# 停止调度器
 ./start_scheduler.sh stop
 
-# 强制停止(如果优雅停止失败)
-kill -KILL $(cat scheduler.pid)
-```
-
-### 4. 重启调度器
-
-```bash
 # 重启调度器
 ./start_scheduler.sh restart
 ```
 
-## 配置说明
+### 2. 监控模式(推荐)
 
-### 线程数量和处理间隔
+监控模式会自动检测进程崩溃、内存异常等问题,并自动重启服务:
 
-在 `multi_thread_scheduler.py` 中修改:
+```bash
+# 启动监控模式
+./start_scheduler.sh monitor
 
-```python
-scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2)
+# 停止监控模式
+./start_scheduler.sh stop-monitor
 ```
 
-- `thread_count`: 工作线程数量(默认5个)
-- `interval_minutes`: 每个线程处理数据的间隔(默认2分钟)
-
-### 日志配置
-
-日志文件会自动按日期命名:
-- 调度器日志:`logs/scheduler_YYYYMMDD.log`
-- 内容识别日志:`logs/content_identifier_YYYYMMDD.log`
-- 标准输出日志:`logs/scheduler_stdout.log`
+监控模式特性:
+- 自动检测进程崩溃并重启
+- 内存使用监控(超过2GB自动重启)
+- 最大重启次数限制(10次)
+- 日志错误检测(包括double free错误)
 
-## 服务器部署建议
+### 3. 系统监控脚本
 
-### 1. 使用screen或tmux
+对于更高级的监控需求,可以使用独立的系统监控脚本:
 
 ```bash
-# 使用screen
-screen -S scheduler
-./start_scheduler.sh start
-# 按 Ctrl+A 然后按 D 分离会话
-
-# 重新连接会话
-screen -r scheduler
+# 启动系统监控
+python3 system_monitor.py
 ```
 
-### 2. 使用systemd服务(推荐)
+系统监控功能:
+- 实时监控进程状态
+- 系统资源使用监控(CPU、内存、磁盘)
+- 自动错误检测和恢复
+- 详细的监控日志
 
-创建服务文件 `/etc/systemd/system/content-scheduler.service`:
+### 4. 缓存管理
 
-```ini
-[Unit]
-Description=Content Recognition Scheduler
-After=network.target
+管理视频识别过程中的临时缓存文件:
 
-[Service]
-Type=forking
-User=your_username
-WorkingDirectory=/path/to/your/project
-ExecStart=/path/to/your/project/content_indentify/start_scheduler.sh start
-ExecStop=/path/to/your/project/content_indentify/start_scheduler.sh stop
-PIDFile=/path/to/your/project/content_indentify/scheduler.pid
-Restart=always
-RestartSec=10
-
-[Install]
-WantedBy=multi-user.target
-```
-
-启用服务:
 ```bash
-sudo systemctl daemon-reload
-sudo systemctl enable content-scheduler
-sudo systemctl start content-scheduler
-sudo systemctl status content-scheduler
-```
+# 查看缓存状态
+./start_scheduler.sh cache-status
 
-### 3. 使用crontab监控
+# 清理过期缓存文件
+./start_scheduler.sh cache-cleanup
 
-```bash
-# 编辑crontab
-crontab -e
+# 清理所有缓存文件
+./start_scheduler.sh cache-clean
 
-# 添加监控任务(每5分钟检查一次)
-*/5 * * * * /path/to/your/project/content_indentify/start_scheduler.sh status > /dev/null 2>&1 || /path/to/your/project/content_indentify/start_scheduler.sh restart
+# 或者直接使用Python脚本
+python3 cache_manager.py status
+python3 cache_manager.py cleanup
 ```
 
-## 日志查看技巧
+缓存管理功能:
+- 自动清理过期缓存文件(默认1小时)
+- 手动清理缓存
+- 缓存状态监控
+- 防止磁盘空间占用过多
 
-### 1. 实时监控日志
+## 配置说明
 
-```bash
-# 监控所有日志
-tail -f logs/*.log
+### 环境变量
 
-# 监控特定日志
-tail -f logs/scheduler_$(date +%Y%m%d).log
+在项目根目录创建 `.env` 文件:
 
-# 监控错误日志
-tail -f logs/*.log | grep ERROR
-```
+```env
+# 数据库配置
+DB_HOST=localhost
+DB_PORT=3306
+DB_USER=your_username
+DB_PASSWORD=your_password
+DB_NAME=your_database
 
-### 2. 日志搜索
+# API密钥
+GEMINI_API_KEY=your_gemini_api_key
+COZE_API_KEY=your_coze_api_key
 
-```bash
-# 搜索特定关键词
-grep "ERROR" logs/*.log
+# 其他配置
+LOG_LEVEL=INFO
+```
 
-# 搜索特定时间段的日志
-grep "2024-01-15" logs/*.log
+### 调度器配置
 
-# 搜索特定线程的日志
-grep "WorkerThread-1" logs/*.log
-```
+在 `multi_thread_scheduler.py` 中可以调整以下参数:
 
-### 3. 日志分析
+```python
+class MultiThreadScheduler:
+    def __init__(self, thread_count=5, interval_minutes=2):
+        # thread_count: 工作线程数量
+        # interval_minutes: 处理间隔(分钟)
+        
+        self.max_memory_mb = 2048  # 最大内存使用量(MB)
+        self.gc_interval = 300     # 垃圾回收间隔(秒)
+```
 
-```bash
-# 统计错误数量
-grep -c "ERROR" logs/*.log
+### 监控配置
 
-# 查看处理成功的记录
-grep -c "数据处理成功" logs/*.log
+在 `system_monitor.py` 中可以调整监控参数:
 
-# 查看处理失败的记录
-grep -c "处理失败" logs/*.log
+```python
+self.config = {
+    'max_memory_mb': 2048,        # 最大内存使用量
+    'max_cpu_percent': 80,        # 最大CPU使用率
+    'max_disk_percent': 90,       # 最大磁盘使用率
+    'check_interval': 30,         # 检查间隔(秒)
+    'restart_delay': 60,          # 重启延迟(秒)
+    'max_restarts': 5,            # 最大重启次数
+}
 ```
 
 ## 故障排除
 
-### 1. 调度器无法启动
-
-- 检查Python环境和依赖包
-- 检查数据库连接配置
-- 查看错误日志
+### Double Free 错误
 
-### 2. 调度器意外停止
+如果遇到 "double free or corruption (!prev)" 错误:
 
-- 检查系统资源(内存、CPU)
-- 查看错误日志
-- 检查数据库连接状态
-
-### 3. 日志文件过大
+1. **使用监控模式**:
+   ```bash
+   ./start_scheduler.sh monitor
+   ```
 
-- 定期清理旧日志文件
-- 调整日志级别
-- 使用logrotate进行日志轮转
+2. **检查内存使用**:
+   ```bash
+   ./start_scheduler.sh status
+   ```
 
-## 注意事项
+3. **查看详细日志**:
+   ```bash
+   tail -f logs/scheduler_*.log
+   tail -f logs/system_monitor.log
+   ```
 
-1. **权限设置**: 确保脚本有执行权限
+4. **手动重启**:
    ```bash
-   chmod +x start_scheduler.sh
+   ./start_scheduler.sh restart
    ```
 
-2. **路径配置**: 确保所有路径都是绝对路径或正确的相对路径
+### 常见问题
+
+1. **进程启动失败**
+   - 检查Python环境和依赖
+   - 查看日志文件中的错误信息
+   - 确保数据库连接正常
+
+2. **内存使用过高**
+   - 调整 `max_memory_mb` 参数
+   - 检查是否有内存泄漏
+   - 使用监控模式自动重启
+
+3. **API调用失败**
+   - 检查API密钥配置
+   - 确认网络连接正常
+   - 查看API调用日志
 
-3. **环境变量**: 确保数据库连接等环境变量已正确配置
+## 日志文件
 
-4. **资源监控**: 定期监控服务器资源使用情况
+- `logs/scheduler_stdout.log` - 调度器标准输出
+- `logs/scheduler_*.log` - 调度器日志文件
+- `logs/system_monitor.log` - 系统监控日志
+- `logs/monitor.log` - 监控模式日志
 
-5. **备份策略**: 定期备份重要的配置和日志文件
+## 性能优化建议
 
-## 联系支持
+1. **调整线程数量**:根据服务器CPU核心数调整 `thread_count`
+2. **优化内存使用**:定期清理缓存文件,监控内存使用
+3. **数据库优化**:确保数据库索引合理,避免慢查询
+4. **网络优化**:使用CDN加速图片和视频下载
+
+## 安全注意事项
+
+1. **API密钥保护**:不要在代码中硬编码API密钥
+2. **文件权限**:确保脚本文件有执行权限
+3. **日志安全**:定期清理敏感日志信息
+4. **进程监控**:限制监控脚本的权限
+
+## 技术支持
 
 如果遇到问题,请:
+
 1. 查看相关日志文件
 2. 检查系统资源使用情况
-3. 确认配置是否正确
-4. 联系技术支持团队 
+3. 确认配置参数是否正确
+4. 尝试使用监控模式自动恢复
+
+## 更新日志
+
+- v2.0.0: 添加监控模式和自动重启功能
+- v1.1.0: 改进内存管理和错误处理
+- v1.0.0: 基础多线程调度器 

+ 165 - 0
content_indentify/cache_manager.py

@@ -0,0 +1,165 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+缓存管理工具
+用于管理视频识别过程中的临时缓存文件
+"""
+
+import os
+import time
+import argparse
+from pathlib import Path
+
+# 缓存目录配置
+CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
+CACHE_MAX_AGE = 3600  # 1小时
+
+def get_cache_info():
+    """获取缓存信息"""
+    if not os.path.exists(CACHE_DIR):
+        return {
+            'exists': False,
+            'total_files': 0,
+            'total_size_mb': 0,
+            'oldest_file': None,
+            'newest_file': None
+        }
+    
+    files = []
+    total_size = 0
+    
+    for filename in os.listdir(CACHE_DIR):
+        file_path = os.path.join(CACHE_DIR, filename)
+        if os.path.isfile(file_path):
+            stat = os.stat(file_path)
+            files.append({
+                'name': filename,
+                'size': stat.st_size,
+                'mtime': stat.st_mtime,
+                'age': time.time() - stat.st_mtime
+            })
+            total_size += stat.st_size
+    
+    if not files:
+        return {
+            'exists': True,
+            'total_files': 0,
+            'total_size_mb': 0,
+            'oldest_file': None,
+            'newest_file': None
+        }
+    
+    # 按修改时间排序
+    files.sort(key=lambda x: x['mtime'])
+    
+    return {
+        'exists': True,
+        'total_files': len(files),
+        'total_size_mb': total_size / (1024 * 1024),
+        'oldest_file': {
+            'name': files[0]['name'],
+            'age_hours': files[0]['age'] / 3600
+        },
+        'newest_file': {
+            'name': files[-1]['name'],
+            'age_hours': files[-1]['age'] / 3600
+        }
+    }
+
+def cleanup_cache(force=False, dry_run=False):
+    """清理缓存文件"""
+    if not os.path.exists(CACHE_DIR):
+        print("缓存目录不存在")
+        return 0
+    
+    current_time = time.time()
+    cleaned_count = 0
+    cleaned_size = 0
+    
+    for filename in os.listdir(CACHE_DIR):
+        file_path = os.path.join(CACHE_DIR, filename)
+        if os.path.isfile(file_path):
+            file_age = current_time - os.path.getmtime(file_path)
+            should_clean = force or file_age > CACHE_MAX_AGE
+            
+            if should_clean:
+                file_size = os.path.getsize(file_path)
+                if dry_run:
+                    print(f"将要清理: {filename} (大小: {file_size/1024:.1f}KB, 年龄: {file_age/3600:.1f}小时)")
+                else:
+                    try:
+                        os.remove(file_path)
+                        cleaned_count += 1
+                        cleaned_size += file_size
+                        print(f"已清理: {filename}")
+                    except Exception as e:
+                        print(f"清理失败: {filename}, 错误: {e}")
+    
+    if not dry_run:
+        print(f"清理完成: {cleaned_count} 个文件, 释放空间: {cleaned_size/(1024*1024):.2f}MB")
+    
+    return cleaned_count
+
+def show_cache_status():
+    """显示缓存状态"""
+    info = get_cache_info()
+    
+    print("=" * 50)
+    print("缓存状态信息")
+    print("=" * 50)
+    
+    if not info['exists']:
+        print("缓存目录不存在")
+        return
+    
+    print(f"缓存目录: {CACHE_DIR}")
+    print(f"文件数量: {info['total_files']}")
+    print(f"总大小: {info['total_size_mb']:.2f} MB")
+    
+    if info['oldest_file']:
+        print(f"最旧文件: {info['oldest_file']['name']} ({info['oldest_file']['age_hours']:.1f}小时前)")
+    
+    if info['newest_file']:
+        print(f"最新文件: {info['newest_file']['name']} ({info['newest_file']['age_hours']:.1f}小时前)")
+    
+    # 显示文件列表
+    if info['total_files'] > 0:
+        print("\n文件列表:")
+        print("-" * 50)
+        if os.path.exists(CACHE_DIR):
+            for filename in sorted(os.listdir(CACHE_DIR)):
+                file_path = os.path.join(CACHE_DIR, filename)
+                if os.path.isfile(file_path):
+                    stat = os.stat(file_path)
+                    age = time.time() - stat.st_mtime
+                    size = stat.st_size
+                    print(f"{filename:<30} {size/1024:>8.1f}KB {age/3600:>6.1f}小时前")
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(description='缓存管理工具')
+    parser.add_argument('action', choices=['status', 'clean', 'cleanup'], 
+                       help='操作类型: status(查看状态), clean(清理), cleanup(清理)')
+    parser.add_argument('--force', '-f', action='store_true', 
+                       help='强制清理所有文件(不检查年龄)')
+    parser.add_argument('--dry-run', '-n', action='store_true', 
+                       help='试运行模式,不实际删除文件')
+    
+    args = parser.parse_args()
+    
+    if args.action == 'status':
+        show_cache_status()
+    elif args.action in ['clean', 'cleanup']:
+        if args.dry_run:
+            print("试运行模式 - 不会实际删除文件")
+            cleanup_cache(force=args.force, dry_run=True)
+        else:
+            print("开始清理缓存...")
+            cleaned = cleanup_cache(force=args.force, dry_run=False)
+            if cleaned > 0:
+                print(f"清理完成,共清理 {cleaned} 个文件")
+            else:
+                print("没有需要清理的文件")
+
+if __name__ == '__main__':
+    main() 

+ 116 - 7
content_indentify/multi_thread_scheduler.py

@@ -11,6 +11,9 @@ import signal
 import sys
 import os
 import atexit
+import gc
+import psutil
+import traceback
 from datetime import datetime
 from indentify import ContentIdentifier
 from utils.logging_config import get_logger
@@ -21,8 +24,11 @@ class MultiThreadScheduler:
         self.interval_seconds = interval_minutes * 60
         self.running = True
         self.threads = []
-        self.identifier = ContentIdentifier()
+        self.identifier = None  # 延迟初始化
         self.pid_file = "scheduler.pid"
+        self.max_memory_mb = 2048  # 最大内存使用量
+        self.last_gc_time = time.time()
+        self.gc_interval = 300  # 每5分钟强制垃圾回收
         
         # 设置日志
         self.logger = get_logger('MultiThreadScheduler')
@@ -36,6 +42,20 @@ class MultiThreadScheduler:
         
         # 创建PID文件
         self.create_pid_file()
+        
+        # 初始化ContentIdentifier(延迟初始化)
+        self.init_identifier()
+    
+    def init_identifier(self):
+        """延迟初始化ContentIdentifier"""
+        try:
+            if self.identifier is None:
+                self.logger.info("初始化ContentIdentifier...")
+                self.identifier = ContentIdentifier()
+                self.logger.info("ContentIdentifier初始化成功")
+        except Exception as e:
+            self.logger.error(f"ContentIdentifier初始化失败: {e}")
+            self.identifier = None
     
     def create_pid_file(self):
         """创建PID文件"""
@@ -54,6 +74,17 @@ class MultiThreadScheduler:
                 self.logger.info("PID文件已清理")
         except Exception as e:
             self.logger.error(f"清理PID文件失败: {e}")
+        
+        # 清理ContentIdentifier
+        if self.identifier:
+            try:
+                del self.identifier
+                self.identifier = None
+            except:
+                pass
+        
+        # 强制垃圾回收
+        gc.collect()
     
     def signal_handler(self, signum, frame):
         """信号处理函数,优雅退出"""
@@ -64,15 +95,64 @@ class MultiThreadScheduler:
         self.cleanup()
         sys.exit(0)
     
+    def check_memory_usage(self):
+        """检查内存使用情况"""
+        try:
+            process = psutil.Process(os.getpid())
+            memory_info = process.memory_info()
+            memory_mb = memory_info.rss / 1024 / 1024
+            
+            # 记录内存使用
+            if memory_mb > self.max_memory_mb * 0.8:
+                self.logger.warning(f"内存使用较高: {memory_mb:.1f}MB / {self.max_memory_mb}MB")
+            
+            return memory_mb
+        except Exception as e:
+            self.logger.error(f"检查内存使用失败: {e}")
+            return 0
+    
+    def force_garbage_collection(self):
+        """强制垃圾回收"""
+        try:
+            current_time = time.time()
+            if current_time - self.last_gc_time > self.gc_interval:
+                self.logger.info("执行强制垃圾回收...")
+                collected = gc.collect()
+                self.logger.info(f"垃圾回收完成,清理了 {collected} 个对象")
+                self.last_gc_time = current_time
+        except Exception as e:
+            self.logger.error(f"垃圾回收失败: {e}")
+    
     def worker_thread(self, thread_id):
         """工作线程函数"""
         thread_logger = get_logger(f'WorkerThread-{thread_id}')
         thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
         
+        # 线程本地变量
+        local_identifier = None
+        
         while self.running:
             try:
                 start_time = time.time()
                 
+                # 检查内存使用
+                memory_mb = self.check_memory_usage()
+                if memory_mb > self.max_memory_mb:
+                    thread_logger.error(f"内存使用过高 ({memory_mb:.1f}MB),跳过本次处理")
+                    time.sleep(60)  # 等待1分钟
+                    continue
+                
+                # 强制垃圾回收
+                self.force_garbage_collection()
+                
+                # 确保identifier可用
+                if self.identifier is None:
+                    self.init_identifier()
+                    if self.identifier is None:
+                        thread_logger.error("ContentIdentifier不可用,等待下次重试")
+                        time.sleep(60)
+                        continue
+                
                 # 处理一条数据
                 thread_logger.info(f"开始处理数据...")
                 success = self.identifier.process_single_record()
@@ -97,8 +177,21 @@ class MultiThreadScheduler:
                         if wait_time <= 0:
                             break
                 
+            except MemoryError as e:
+                thread_logger.error(f"内存不足错误: {e}")
+                # 强制垃圾回收
+                gc.collect()
+                time.sleep(120)  # 等待2分钟
+                
             except Exception as e:
-                thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
+                thread_logger.error(f"处理过程中发生错误: {e}")
+                thread_logger.error(f"错误详情: {traceback.format_exc()}")
+                
+                # 检查是否是严重错误
+                if "double free" in str(e).lower() or "corruption" in str(e).lower():
+                    thread_logger.critical("检测到严重的内存错误,线程将退出")
+                    break
+                
                 # 发生错误时等待一段时间再继续
                 for _ in range(10):
                     if not self.running:
@@ -124,8 +217,8 @@ class MultiThreadScheduler:
             
             # 如果不是最后一个线程,等待5秒再启动下一个
             if i < self.thread_count - 1:
-                self.logger.info("等待5秒后启动下一个线程...")
-                time.sleep(5)
+                self.logger.info("等待30秒后启动下一个线程...")
+                time.sleep(30)
         
         self.logger.info(f"所有 {self.thread_count} 个线程已启动")
         self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
@@ -150,14 +243,25 @@ class MultiThreadScheduler:
     def run(self):
         """运行调度器"""
         try:
+            # 设置Python内存管理
+            gc.set_threshold(700, 10, 10)  # 更积极的垃圾回收
+            
             self.start_all_threads()
             
             # 主线程保持运行,等待信号
             while self.running:
                 time.sleep(1)
                 
+                # 定期检查内存和垃圾回收
+                self.force_garbage_collection()
+                
         except KeyboardInterrupt:
             self.logger.info("收到键盘中断信号")
+        except MemoryError as e:
+            self.logger.critical(f"主线程内存不足: {e}")
+        except Exception as e:
+            self.logger.critical(f"主线程发生错误: {e}")
+            self.logger.critical(f"错误详情: {traceback.format_exc()}")
         finally:
             self.stop_all_threads()
             self.cleanup()
@@ -172,9 +276,14 @@ def main():
     print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
     print("=" * 60)
     
-    # 创建并运行调度器
-    scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2)
-    scheduler.run()
+    try:
+        # 创建并运行调度器
+        scheduler = MultiThreadScheduler(thread_count=3, interval_minutes=2)
+        scheduler.run()
+    except Exception as e:
+        print(f"调度器启动失败: {e}")
+        print(f"错误详情: {traceback.format_exc()}")
+        sys.exit(1)
 
 if __name__ == "__main__":
     main() 

+ 4 - 1
content_indentify/requirements.txt

@@ -15,4 +15,7 @@ google-genai>=1.29.0
 lark-oapi>=1.4.20
 
 # 日志和工具
-loguru>=0.7.0 
+loguru>=0.7.0
+
+# 系统监控
+psutil>=5.9.0 

+ 0 - 1
content_indentify/scheduler.pid

@@ -1 +0,0 @@
-82953

+ 202 - 9
content_indentify/start_scheduler.sh

@@ -6,10 +6,15 @@
 #   ./start_scheduler.sh stop     # 停止调度器
 #   ./start_scheduler.sh status   # 查看状态
 #   ./start_scheduler.sh restart  # 重启调度器
+#   ./start_scheduler.sh monitor  # 监控模式(自动重启)
 
 SCRIPT_NAME="multi_thread_scheduler.py"
 PID_FILE="scheduler.pid"
 LOG_DIR="logs"
+MONITOR_PID_FILE="monitor.pid"
+MAX_MEMORY_MB=2048  # 最大内存使用量(MB)
+RESTART_COUNT_FILE="restart_count.txt"
+MAX_RESTARTS=10     # 最大重启次数
 
 # 检查Python环境
 if ! command -v python3 &> /dev/null; then
@@ -40,6 +45,15 @@ get_pid() {
     fi
 }
 
+# 获取监控进程ID
+get_monitor_pid() {
+    if [ -f "$MONITOR_PID_FILE" ]; then
+        cat "$MONITOR_PID_FILE"
+    else
+        echo ""
+    fi
+}
+
 # 检查进程是否运行
 is_running() {
     local pid=$(get_pid)
@@ -55,6 +69,38 @@ is_running() {
     return 1
 }
 
+# 检查内存使用情况
+check_memory() {
+    local pid=$(get_pid)
+    if [ -n "$pid" ] && ps -p "$pid" > /dev/null 2>&1; then
+        local memory_kb=$(ps -p "$pid" -o rss= 2>/dev/null | tr -d ' ')
+        if [ -n "$memory_kb" ]; then
+            local memory_mb=$((memory_kb / 1024))
+            echo "$memory_mb"
+        else
+            echo "0"
+        fi
+    else
+        echo "0"
+    fi
+}
+
+# 记录重启次数
+record_restart() {
+    local count=0
+    if [ -f "$RESTART_COUNT_FILE" ]; then
+        count=$(cat "$RESTART_COUNT_FILE")
+    fi
+    count=$((count + 1))
+    echo "$count" > "$RESTART_COUNT_FILE"
+    echo "$count"
+}
+
+# 重置重启计数
+reset_restart_count() {
+    echo "0" > "$RESTART_COUNT_FILE"
+}
+
 # 启动调度器
 start_scheduler() {
     if is_running; then
@@ -64,9 +110,15 @@ start_scheduler() {
     
     echo "正在启动多线程调度器..."
     echo "日志文件将保存在 $LOG_DIR/ 目录中"
+    echo "最大内存使用量: ${MAX_MEMORY_MB}MB"
+    
+    # 设置环境变量,增加Python内存管理稳定性
+    export PYTHONMALLOC=malloc
+    export PYTHONDEVMODE=1
+    export PYTHONUNBUFFERED=1
     
     # 后台运行Python脚本
-    nohup python3 "$SCRIPT_NAME" > "$LOG_DIR/scheduler_stdout.log" 2>&1 &
+    nohup python3 -u "$SCRIPT_NAME" > "$LOG_DIR/scheduler_stdout.log" 2>&1 &
     local pid=$!
     
     # 保存PID到文件
@@ -79,6 +131,14 @@ start_scheduler() {
     echo ""
     echo "使用以下命令停止:"
     echo "  ./start_scheduler.sh stop"
+    
+    # 等待几秒检查进程是否正常启动
+    sleep 3
+    if ! ps -p "$pid" > /dev/null 2>&1; then
+        echo "警告: 进程启动后立即退出,请检查日志文件"
+        rm -f "$PID_FILE"
+        return 1
+    fi
 }
 
 # 停止调度器
@@ -113,13 +173,45 @@ stop_scheduler() {
     echo "调度器已停止"
 }
 
+# 停止监控进程
+stop_monitor() {
+    local monitor_pid=$(get_monitor_pid)
+    if [ -n "$monitor_pid" ]; then
+        echo "正在停止监控进程 (PID: $monitor_pid)..."
+        kill -TERM "$monitor_pid" 2>/dev/null
+        
+        local count=0
+        while [ $count -lt 5 ] && ps -p "$monitor_pid" > /dev/null 2>&1; do
+            sleep 1
+            count=$((count + 1))
+        done
+        
+        if ps -p "$monitor_pid" > /dev/null 2>&1; then
+            kill -KILL "$monitor_pid" 2>/dev/null
+        fi
+        
+        rm -f "$MONITOR_PID_FILE"
+        echo "监控进程已停止"
+    fi
+}
+
 # 查看状态
 show_status() {
     if is_running; then
         local pid=$(get_pid)
+        local memory_mb=$(check_memory)
         echo "调度器正在运行 (PID: $pid)"
+        echo "内存使用: ${memory_mb}MB / ${MAX_MEMORY_MB}MB"
         echo "进程信息:"
         ps -p "$pid" -o pid,ppid,cmd,etime
+        echo ""
+        
+        # 检查重启次数
+        if [ -f "$RESTART_COUNT_FILE" ]; then
+            local restart_count=$(cat "$RESTART_COUNT_FILE")
+            echo "重启次数: $restart_count / $MAX_RESTARTS"
+        fi
+        
         echo ""
         echo "最近的日志:"
         if [ -f "$LOG_DIR/scheduler_$(date +%Y%m%d).log" ]; then
@@ -127,6 +219,15 @@ show_status() {
         else
             echo "未找到今日日志文件"
         fi
+        
+        # 检查监控进程状态
+        if [ -f "$MONITOR_PID_FILE" ]; then
+            local monitor_pid=$(get_monitor_pid)
+            if ps -p "$monitor_pid" > /dev/null 2>&1; then
+                echo ""
+                echo "监控进程正在运行 (PID: $monitor_pid)"
+            fi
+        fi
     else
         echo "调度器未运行"
     fi
@@ -140,6 +241,66 @@ restart_scheduler() {
     start_scheduler
 }
 
+# 监控模式
+start_monitor() {
+    if [ -f "$MONITOR_PID_FILE" ] && ps -p "$(get_monitor_pid)" > /dev/null 2>&1; then
+        echo "监控进程已经在运行中"
+        return 1
+    fi
+    
+    echo "启动监控模式..."
+    echo "监控进程将自动重启崩溃的调度器"
+    
+    # 重置重启计数
+    reset_restart_count
+    
+    # 启动监控进程
+    (
+        while true; do
+            if ! is_running; then
+                local restart_count=$(record_restart)
+                echo "$(date): 调度器已停止,尝试重启 (第${restart_count}次)"
+                
+                if [ "$restart_count" -le "$MAX_RESTARTS" ]; then
+                    start_scheduler
+                    if is_running; then
+                        echo "$(date): 调度器重启成功"
+                        sleep 30  # 等待30秒再检查
+                    else
+                        echo "$(date): 调度器重启失败"
+                        sleep 60  # 等待1分钟再尝试
+                    fi
+                else
+                    echo "$(date): 达到最大重启次数 ($MAX_RESTARTS),停止监控"
+                    break
+                fi
+            else
+                # 检查内存使用情况
+                local memory_mb=$(check_memory)
+                if [ "$memory_mb" -gt "$MAX_MEMORY_MB" ]; then
+                    echo "$(date): 内存使用过高 (${memory_mb}MB > ${MAX_MEMORY_MB}MB),重启调度器"
+                    stop_scheduler
+                    sleep 5
+                    start_scheduler
+                fi
+                
+                sleep 30  # 每30秒检查一次
+            fi
+        done
+    ) > "$LOG_DIR/monitor.log" 2>&1 &
+    
+    local monitor_pid=$!
+    echo "$monitor_pid" > "$MONITOR_PID_FILE"
+    echo "监控进程已启动 (PID: $monitor_pid)"
+    echo "监控日志: tail -f $LOG_DIR/monitor.log"
+}
+
+# 停止监控
+stop_monitor_mode() {
+    stop_monitor
+    echo "监控模式已停止"
+}
+
 # 主逻辑
 case "${1:-start}" in
     start)
@@ -147,6 +308,7 @@ case "${1:-start}" in
         ;;
     stop)
         stop_scheduler
+        stop_monitor
         ;;
     status)
         show_status
@@ -154,19 +316,50 @@ case "${1:-start}" in
     restart)
         restart_scheduler
         ;;
+    monitor)
+        start_monitor
+        ;;
+    stop-monitor)
+        stop_monitor_mode
+        ;;
+    cache-status)
+        echo "查看缓存状态..."
+        python3 cache_manager.py status
+        ;;
+    cache-clean)
+        echo "清理缓存文件..."
+        python3 cache_manager.py clean
+        ;;
+    cache-cleanup)
+        echo "清理过期缓存文件..."
+        python3 cache_manager.py cleanup
+        ;;
     *)
-        echo "用法: $0 {start|stop|status|restart}"
+        echo "用法: $0 {start|stop|status|restart|monitor|stop-monitor}"
         echo ""
         echo "命令说明:"
-        echo "  start   - 启动调度器"
-        echo "  stop    - 停止调度器"
-        echo "  status  - 查看运行状态"
-        echo "  restart - 重启调度器"
+        echo "  start        - 启动调度器"
+        echo "  stop         - 停止调度器和监控"
+        echo "  status       - 查看运行状态"
+        echo "  restart      - 重启调度器"
+        echo "  monitor      - 启动监控模式(自动重启)"
+        echo "  stop-monitor - 停止监控模式"
+        echo "  cache-status - 查看缓存状态"
+        echo "  cache-clean  - 清理所有缓存文件"
+        echo "  cache-cleanup- 清理过期缓存文件"
+        echo ""
+        echo "监控模式特性:"
+        echo "  - 自动检测进程崩溃并重启"
+        echo "  - 内存使用监控(超过${MAX_MEMORY_MB}MB自动重启)"
+        echo "  - 最大重启次数限制: $MAX_RESTARTS"
         echo ""
         echo "示例:"
-        echo "  $0 start    # 启动"
-        echo "  $0 status   # 查看状态"
-        echo "  $0 stop     # 停止"
+        echo "  $0 start        # 启动"
+        echo "  $0 monitor      # 启动监控模式"
+        echo "  $0 status       # 查看状态"
+        echo "  $0 cache-status # 查看缓存状态"
+        echo "  $0 cache-cleanup # 清理过期缓存"
+        echo "  $0 stop         # 停止所有"
         exit 1
         ;;
 esac 

+ 303 - 0
content_indentify/system_monitor.py

@@ -0,0 +1,303 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+系统监控脚本
+监控调度器的运行状态、内存使用、CPU使用等,自动处理异常情况
+"""
+
+import os
+import sys
+import time
+import psutil
+import signal
+import subprocess
+import json
+import logging
+from datetime import datetime, timedelta
+from typing import Dict, Any, Optional
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler('logs/system_monitor.log'),
+        logging.StreamHandler()
+    ]
+)
+logger = logging.getLogger(__name__)
+
+class SystemMonitor:
+    def __init__(self):
+        self.config = {
+            'max_memory_mb': 2048,        # 最大内存使用量
+            'max_cpu_percent': 80,        # 最大CPU使用率
+            'max_disk_percent': 90,       # 最大磁盘使用率
+            'check_interval': 30,         # 检查间隔(秒)
+            'restart_delay': 60,          # 重启延迟(秒)
+            'max_restarts': 5,            # 最大重启次数
+            'pid_file': 'scheduler.pid',  # 调度器PID文件
+            'log_file': 'logs/system_monitor.log'
+        }
+        
+        self.restart_count = 0
+        self.last_restart_time = None
+        self.running = True
+        
+        # 设置信号处理
+        signal.signal(signal.SIGINT, self.signal_handler)
+        signal.signal(signal.SIGTERM, self.signal_handler)
+    
+    def signal_handler(self, signum, frame):
+        """信号处理函数"""
+        signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
+        logger.info(f"收到信号 {signal_name},正在停止监控...")
+        self.running = False
+    
+    def get_process_info(self) -> Optional[Dict[str, Any]]:
+        """获取调度器进程信息"""
+        try:
+            if not os.path.exists(self.config['pid_file']):
+                return None
+            
+            with open(self.config['pid_file'], 'r') as f:
+                pid = int(f.read().strip())
+            
+            if not psutil.pid_exists(pid):
+                return None
+            
+            process = psutil.Process(pid)
+            return {
+                'pid': pid,
+                'name': process.name(),
+                'memory_mb': process.memory_info().rss / 1024 / 1024,
+                'cpu_percent': process.cpu_percent(),
+                'status': process.status(),
+                'create_time': process.create_time(),
+                'num_threads': process.num_threads()
+            }
+        except Exception as e:
+            logger.error(f"获取进程信息失败: {e}")
+            return None
+    
+    def check_system_resources(self) -> Dict[str, Any]:
+        """检查系统资源使用情况"""
+        try:
+            # CPU使用率
+            cpu_percent = psutil.cpu_percent(interval=1)
+            
+            # 内存使用率
+            memory = psutil.virtual_memory()
+            memory_percent = memory.percent
+            memory_available_gb = memory.available / 1024 / 1024 / 1024
+            
+            # 磁盘使用率
+            disk = psutil.disk_usage('/')
+            disk_percent = disk.percent
+            
+            # 网络连接数
+            net_connections = len(psutil.net_connections())
+            
+            return {
+                'cpu_percent': cpu_percent,
+                'memory_percent': memory_percent,
+                'memory_available_gb': memory_available_gb,
+                'disk_percent': disk_percent,
+                'net_connections': net_connections,
+                'timestamp': datetime.now().isoformat()
+            }
+        except Exception as e:
+            logger.error(f"检查系统资源失败: {e}")
+            return {}
+    
+    def check_logs_for_errors(self) -> bool:
+        """检查日志文件中的错误"""
+        try:
+            log_files = [
+                'logs/scheduler_stdout.log',
+                'logs/scheduler_*.log'
+            ]
+            
+            error_patterns = [
+                'double free',
+                'corruption',
+                'segmentation fault',
+                'memory error',
+                'out of memory',
+                'killed'
+            ]
+            
+            for log_pattern in log_files:
+                if '*' in log_pattern:
+                    # 处理通配符
+                    import glob
+                    log_files = glob.glob(log_pattern)
+                else:
+                    log_files = [log_pattern]
+                
+                for log_file in log_files:
+                    if os.path.exists(log_file):
+                        try:
+                            with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
+                                content = f.read()
+                                for pattern in error_patterns:
+                                    if pattern.lower() in content.lower():
+                                        logger.warning(f"在日志文件 {log_file} 中发现错误模式: {pattern}")
+                                        return True
+                        except Exception as e:
+                            logger.error(f"读取日志文件 {log_file} 失败: {e}")
+            
+            return False
+        except Exception as e:
+            logger.error(f"检查日志文件失败: {e}")
+            return False
+    
+    def restart_scheduler(self) -> bool:
+        """重启调度器"""
+        try:
+            current_time = datetime.now()
+            
+            # 检查重启频率限制
+            if (self.last_restart_time and 
+                (current_time - self.last_restart_time).seconds < self.config['restart_delay']):
+                logger.warning("重启过于频繁,跳过本次重启")
+                return False
+            
+            if self.restart_count >= self.config['max_restarts']:
+                logger.error(f"达到最大重启次数 ({self.config['max_restarts']}),停止重启")
+                return False
+            
+            logger.info("正在重启调度器...")
+            
+            # 停止现有进程
+            process_info = self.get_process_info()
+            if process_info:
+                try:
+                    os.kill(process_info['pid'], signal.SIGTERM)
+                    time.sleep(5)
+                    if psutil.pid_exists(process_info['pid']):
+                        os.kill(process_info['pid'], signal.SIGILL)
+                except Exception as e:
+                    logger.error(f"停止进程失败: {e}")
+            
+            # 等待进程完全停止
+            time.sleep(10)
+            
+            # 启动新进程
+            try:
+                subprocess.Popen(['python3', 'multi_thread_scheduler.py'], 
+                               stdout=open('logs/scheduler_stdout.log', 'a'),
+                               stderr=subprocess.STDOUT)
+                
+                # 等待进程启动
+                time.sleep(15)
+                
+                # 检查是否启动成功
+                if self.get_process_info():
+                    logger.info("调度器重启成功")
+                    self.restart_count += 1
+                    self.last_restart_time = current_time
+                    return True
+                else:
+                    logger.error("调度器重启失败")
+                    return False
+                    
+            except Exception as e:
+                logger.error(f"启动调度器失败: {e}")
+                return False
+                
+        except Exception as e:
+            logger.error(f"重启调度器过程中发生错误: {e}")
+            return False
+    
+    def should_restart(self, process_info: Dict[str, Any], system_info: Dict[str, Any]) -> bool:
+        """判断是否需要重启"""
+        if not process_info:
+            logger.warning("调度器进程不存在,需要重启")
+            return True
+        
+        # 检查内存使用
+        if process_info['memory_mb'] > self.config['max_memory_mb']:
+            logger.warning(f"内存使用过高: {process_info['memory_mb']:.1f}MB > {self.config['max_memory_mb']}MB")
+            return True
+        
+        # 检查CPU使用率
+        if process_info['cpu_percent'] > self.config['max_cpu_percent']:
+            logger.warning(f"CPU使用率过高: {process_info['cpu_percent']:.1f}% > {self.config['max_cpu_percent']}%")
+            return True
+        
+        # 检查系统资源
+        if system_info.get('memory_percent', 0) > 90:
+            logger.warning(f"系统内存使用率过高: {system_info['memory_percent']:.1f}%")
+            return True
+        
+        if system_info.get('disk_percent', 0) > self.config['max_disk_percent']:
+            logger.warning(f"磁盘使用率过高: {system_info['disk_percent']:.1f}%")
+            return True
+        
+        # 检查日志中的错误
+        if self.check_logs_for_errors():
+            logger.warning("检测到日志错误,需要重启")
+            return True
+        
+        return False
+    
+    def run(self):
+        """运行监控"""
+        logger.info("系统监控启动")
+        logger.info(f"配置: {json.dumps(self.config, indent=2)}")
+        
+        while self.running:
+            try:
+                # 获取进程信息
+                process_info = self.get_process_info()
+                
+                # 获取系统资源信息
+                system_info = self.check_system_resources()
+                
+                # 记录状态
+                if process_info:
+                    logger.info(f"进程状态: PID={process_info['pid']}, "
+                              f"内存={process_info['memory_mb']:.1f}MB, "
+                              f"CPU={process_info['cpu_percent']:.1f}%")
+                
+                if system_info:
+                    logger.info(f"系统状态: CPU={system_info['cpu_percent']:.1f}%, "
+                              f"内存={system_info['memory_percent']:.1f}%, "
+                              f"磁盘={system_info['disk_percent']:.1f}%")
+                
+                # 检查是否需要重启
+                if self.should_restart(process_info, system_info):
+                    if self.restart_scheduler():
+                        logger.info("重启操作完成")
+                    else:
+                        logger.error("重启操作失败")
+                
+                # 等待下次检查
+                time.sleep(self.config['check_interval'])
+                
+            except Exception as e:
+                logger.error(f"监控过程中发生错误: {e}")
+                time.sleep(self.config['check_interval'])
+        
+        logger.info("系统监控已停止")
+
+def main():
+    """主函数"""
+    print("=" * 60)
+    print("系统监控脚本")
+    print("=" * 60)
+    print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    print("=" * 60)
+    
+    try:
+        monitor = SystemMonitor()
+        monitor.run()
+    except KeyboardInterrupt:
+        print("\n收到中断信号,正在停止...")
+    except Exception as e:
+        print(f"监控脚本运行失败: {e}")
+        sys.exit(1)
+
+if __name__ == "__main__":
+    main() 

+ 66 - 0
content_indentify/video_identifier.py

@@ -26,6 +26,8 @@ from google.generativeai.types import HarmCategory, HarmBlockThreshold
 
 # 缓存目录配置
 CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
+# 缓存文件最大保留时间(秒)
+CACHE_MAX_AGE = 3600  # 1小时
 
 
 class VideoIdentifier:
@@ -41,6 +43,9 @@ class VideoIdentifier:
         # 配置Gemini
         genai.configure(api_key=self.api_key)
         
+        # 初始化缓存清理时间
+        self.last_cache_cleanup = time.time()
+        
         # 统一的系统提示词 - 三个维度分析
         self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
 
@@ -96,9 +101,19 @@ class VideoIdentifier:
                             return file_path
                         except Exception as e:
                             print(f'视频保存失败: {e}')
+                            # 保存失败时清理已创建的文件
+                            if os.path.exists(file_path):
+                                try:
+                                    os.remove(file_path)
+                                    print(f'已清理下载失败的文件: {file_path}')
+                                except:
+                                    pass
                             return None
                     else:
                         print(f'视频下载失败,状态码: {response.status_code}')
+                        if attempt == 2:  # 最后一次尝试失败
+                            print(f'所有下载尝试都失败了')
+                            return None
                 except Exception as e:
                     print(f'下载尝试 {attempt + 1} 失败: {e}')
                     if attempt < 2:  # 不是最后一次尝试
@@ -114,6 +129,37 @@ class VideoIdentifier:
         
         return None
     
+    def cleanup_cache(self):
+        """清理过期的缓存文件"""
+        try:
+            current_time = time.time()
+            # 每小时清理一次缓存
+            if current_time - self.last_cache_cleanup < 3600:
+                return
+            
+            if not os.path.exists(CACHE_DIR):
+                return
+            
+            cleaned_count = 0
+            for filename in os.listdir(CACHE_DIR):
+                file_path = os.path.join(CACHE_DIR, filename)
+                if os.path.isfile(file_path):
+                    file_age = current_time - os.path.getmtime(file_path)
+                    if file_age > CACHE_MAX_AGE:
+                        try:
+                            os.remove(file_path)
+                            cleaned_count += 1
+                        except Exception as e:
+                            print(f'清理缓存文件失败: {file_path}, 错误: {e}')
+            
+            if cleaned_count > 0:
+                print(f'已清理 {cleaned_count} 个过期缓存文件')
+            
+            self.last_cache_cleanup = current_time
+            
+        except Exception as e:
+            print(f'清理缓存失败: {e}')
+    
     def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
         """上传视频到Gemini进行分析"""
         max_retries = 3
@@ -285,6 +331,7 @@ class VideoIdentifier:
                         return None
                 else:
                     # 非网络错误,直接返回
+                    print(f"    非网络错误,不进行重试")
                     return None
         
         return None
@@ -418,6 +465,14 @@ class VideoIdentifier:
             video_file = self.upload_video_to_gemini(video_path)
             if not video_file:
                 print("  视频上传到Gemini失败")
+                # 上传失败时也要清理缓存文件
+                if video_path and os.path.exists(video_path):
+                    try:
+                        os.remove(video_path)
+                        print(f"  上传失败,缓存文件已清理: {video_path}")
+                    except Exception as e:
+                        print(f"  清理缓存文件失败: {e}")
+                
                 return {
                     'url': video_info['url'],
                     'duration': video_info['duration'],
@@ -447,6 +502,14 @@ class VideoIdentifier:
             
         except Exception as e:
             print(f"  视频处理异常: {e}")
+            # 异常情况下也要清理缓存文件
+            if video_path and os.path.exists(video_path):
+                try:
+                    os.remove(video_path)
+                    print(f"  异常处理,缓存文件已清理: {video_path}")
+                except Exception as e:
+                    print(f"  清理缓存文件失败: {e}")
+            
             return {
                 'url': video_info['url'],
                 'duration': video_info['duration'],
@@ -480,6 +543,9 @@ class VideoIdentifier:
         """处理视频识别的主函数"""
         print("开始视频识别处理...")
         
+        # 定期清理缓存
+        self.cleanup_cache()
+        
         # 提取视频URL
         video_data = self.extract_video_urls(formatted_content)
         print(f"提取到 {len(video_data)} 个视频")