123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 多线程定时任务调度器
- 开启5个线程,每2分钟调用一次identifier.process_single_record()处理一条数据
- """
- import threading
- import time
- import signal
- import sys
- import os
- import atexit
- import gc
- import psutil
- import traceback
- from datetime import datetime
- from indentify import ContentIdentifier
- from utils.logging_config import get_logger
- class MultiThreadScheduler:
- def __init__(self, thread_count=5, interval_minutes=2):
- self.thread_count = thread_count
- self.interval_seconds = interval_minutes * 60
- self.running = True
- self.threads = []
- self.identifier = None # 延迟初始化
- self.pid_file = "scheduler.pid"
- self.max_memory_mb = 2048 # 最大内存使用量
- self.last_gc_time = time.time()
- self.gc_interval = 300 # 每5分钟强制垃圾回收
-
- # 设置日志
- self.logger = get_logger('MultiThreadScheduler')
-
- # 设置信号处理,优雅退出
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
-
- # 注册退出时的清理函数
- atexit.register(self.cleanup)
-
- # 创建PID文件
- self.create_pid_file()
-
- # 初始化ContentIdentifier(延迟初始化)
- self.init_identifier()
-
- def init_identifier(self):
- """延迟初始化ContentIdentifier"""
- try:
- if self.identifier is None:
- self.logger.info("初始化ContentIdentifier...")
- self.identifier = ContentIdentifier()
- self.logger.info("ContentIdentifier初始化成功")
- except Exception as e:
- self.logger.error(f"ContentIdentifier初始化失败: {e}")
- self.identifier = None
-
- def create_pid_file(self):
- """创建PID文件"""
- try:
- with open(self.pid_file, 'w') as f:
- f.write(str(os.getpid()))
- self.logger.info(f"PID文件已创建: {self.pid_file}")
- except Exception as e:
- self.logger.error(f"创建PID文件失败: {e}")
-
- def cleanup(self):
- """清理资源"""
- try:
- if os.path.exists(self.pid_file):
- os.remove(self.pid_file)
- self.logger.info("PID文件已清理")
- except Exception as e:
- self.logger.error(f"清理PID文件失败: {e}")
-
- # 清理ContentIdentifier
- if self.identifier:
- try:
- del self.identifier
- self.identifier = None
- except:
- pass
-
- # 强制垃圾回收
- gc.collect()
-
- def signal_handler(self, signum, frame):
- """信号处理函数,优雅退出"""
- signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
- self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
- self.running = False
- self.stop_all_threads()
- self.cleanup()
- sys.exit(0)
-
- def check_memory_usage(self):
- """检查内存使用情况"""
- try:
- process = psutil.Process(os.getpid())
- memory_info = process.memory_info()
- memory_mb = memory_info.rss / 1024 / 1024
-
- # 记录内存使用
- if memory_mb > self.max_memory_mb * 0.8:
- self.logger.warning(f"内存使用较高: {memory_mb:.1f}MB / {self.max_memory_mb}MB")
-
- return memory_mb
- except Exception as e:
- self.logger.error(f"检查内存使用失败: {e}")
- return 0
-
- def force_garbage_collection(self):
- """强制垃圾回收"""
- try:
- current_time = time.time()
- if current_time - self.last_gc_time > self.gc_interval:
- self.logger.info("执行强制垃圾回收...")
- collected = gc.collect()
- self.logger.info(f"垃圾回收完成,清理了 {collected} 个对象")
- self.last_gc_time = current_time
- except Exception as e:
- self.logger.error(f"垃圾回收失败: {e}")
-
- def worker_thread(self, thread_id):
- """工作线程函数"""
- thread_logger = get_logger(f'WorkerThread-{thread_id}')
- thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
-
- # 线程本地变量
- local_identifier = None
-
- while self.running:
- try:
- start_time = time.time()
-
- # 检查内存使用
- memory_mb = self.check_memory_usage()
- if memory_mb > self.max_memory_mb:
- thread_logger.error(f"内存使用过高 ({memory_mb:.1f}MB),跳过本次处理")
- time.sleep(60) # 等待1分钟
- continue
-
- # 强制垃圾回收
- self.force_garbage_collection()
-
- # 确保identifier可用
- if self.identifier is None:
- self.init_identifier()
- if self.identifier is None:
- thread_logger.error("ContentIdentifier不可用,等待下次重试")
- time.sleep(60)
- continue
-
- # 处理一条数据
- thread_logger.info(f"开始处理数据...")
- success = self.identifier.process_single_record()
-
- if success:
- thread_logger.info("数据处理成功")
- else:
- thread_logger.info("没有数据需要处理或处理失败")
-
- # 计算剩余等待时间
- elapsed_time = time.time() - start_time
- wait_time = max(0, self.interval_seconds - elapsed_time)
-
- if wait_time > 0:
- thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
- # 分段等待,每10秒检查一次running状态
- for _ in range(int(wait_time / 10) + 1):
- if not self.running:
- break
- time.sleep(min(10, wait_time))
- wait_time -= 10
- if wait_time <= 0:
- break
-
- except MemoryError as e:
- thread_logger.error(f"内存不足错误: {e}")
- # 强制垃圾回收
- gc.collect()
- time.sleep(120) # 等待2分钟
-
- except Exception as e:
- thread_logger.error(f"处理过程中发生错误: {e}")
- thread_logger.error(f"错误详情: {traceback.format_exc()}")
-
- # 检查是否是严重错误
- if "double free" in str(e).lower() or "corruption" in str(e).lower():
- thread_logger.critical("检测到严重的内存错误,线程将退出")
- break
-
- # 发生错误时等待一段时间再继续
- for _ in range(10):
- if not self.running:
- break
- time.sleep(1)
-
- thread_logger.info(f"线程 {thread_id} 已停止")
-
- def start_all_threads(self):
- """启动所有工作线程"""
- self.logger.info(f"启动 {self.thread_count} 个工作线程...")
-
- for i in range(self.thread_count):
- thread = threading.Thread(
- target=self.worker_thread,
- args=(i+1,),
- daemon=True,
- name=f"WorkerThread-{i+1}"
- )
- thread.start()
- self.threads.append(thread)
- self.logger.info(f"线程 {i+1} 已启动")
-
- # 如果不是最后一个线程,等待5秒再启动下一个
- if i < self.thread_count - 1:
- self.logger.info("等待30秒后启动下一个线程...")
- time.sleep(30)
-
- self.logger.info(f"所有 {self.thread_count} 个线程已启动")
- self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
- self.logger.info("使用以下命令停止: ./start_scheduler.sh stop")
-
- def stop_all_threads(self):
- """停止所有线程"""
- self.logger.info("正在停止所有线程...")
- self.running = False
-
- # 等待所有线程结束
- for i, thread in enumerate(self.threads):
- if thread.is_alive():
- thread.join(timeout=5)
- if thread.is_alive():
- self.logger.warning(f"线程 {i+1} 未能正常停止")
- else:
- self.logger.info(f"线程 {i+1} 已停止")
-
- self.logger.info("所有线程已停止")
-
- def run(self):
- """运行调度器"""
- try:
- # 设置Python内存管理
- gc.set_threshold(700, 10, 10) # 更积极的垃圾回收
-
- self.start_all_threads()
-
- # 主线程保持运行,等待信号
- while self.running:
- time.sleep(1)
-
- # 定期检查内存和垃圾回收
- self.force_garbage_collection()
-
- except KeyboardInterrupt:
- self.logger.info("收到键盘中断信号")
- except MemoryError as e:
- self.logger.critical(f"主线程内存不足: {e}")
- except Exception as e:
- self.logger.critical(f"主线程发生错误: {e}")
- self.logger.critical(f"错误详情: {traceback.format_exc()}")
- finally:
- self.stop_all_threads()
- self.cleanup()
- def main():
- """主函数"""
- print("=" * 60)
- print("多线程定时任务调度器")
- print("=" * 60)
- print(f"线程数量: 5")
- print(f"处理间隔: 2分钟")
- print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("=" * 60)
-
- try:
- # 创建并运行调度器
- scheduler = MultiThreadScheduler(thread_count=3, interval_minutes=2)
- scheduler.run()
- except Exception as e:
- print(f"调度器启动失败: {e}")
- print(f"错误详情: {traceback.format_exc()}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|