#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多线程定时任务调度器 开启5个线程,每2分钟调用一次identifier.process_single_record()处理一条数据 """ import threading import time import signal import sys import os import atexit from datetime import datetime from indentify import ContentIdentifier from utils.logging_config import get_logger class MultiThreadScheduler: def __init__(self, thread_count=5, interval_minutes=2): self.thread_count = thread_count self.interval_seconds = interval_minutes * 60 self.running = True self.threads = [] self.identifier = ContentIdentifier() self.pid_file = "scheduler.pid" # 设置日志 self.logger = get_logger('MultiThreadScheduler') # 设置信号处理,优雅退出 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) # 注册退出时的清理函数 atexit.register(self.cleanup) # 创建PID文件 self.create_pid_file() def create_pid_file(self): """创建PID文件""" try: with open(self.pid_file, 'w') as f: f.write(str(os.getpid())) self.logger.info(f"PID文件已创建: {self.pid_file}") except Exception as e: self.logger.error(f"创建PID文件失败: {e}") def cleanup(self): """清理资源""" try: if os.path.exists(self.pid_file): os.remove(self.pid_file) self.logger.info("PID文件已清理") except Exception as e: self.logger.error(f"清理PID文件失败: {e}") def signal_handler(self, signum, frame): """信号处理函数,优雅退出""" signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" self.logger.info(f"收到信号 {signal_name},正在优雅退出...") self.running = False self.stop_all_threads() self.cleanup() sys.exit(0) def worker_thread(self, thread_id): """工作线程函数""" thread_logger = get_logger(f'WorkerThread-{thread_id}') thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据") while self.running: try: start_time = time.time() # 处理一条数据 thread_logger.info(f"开始处理数据...") success = self.identifier.process_single_record() if success: thread_logger.info("数据处理成功") else: thread_logger.info("没有数据需要处理或处理失败") # 计算剩余等待时间 elapsed_time = time.time() - start_time wait_time = max(0, self.interval_seconds - elapsed_time) if wait_time > 0: thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...") # 分段等待,每10秒检查一次running状态 for _ in range(int(wait_time / 10) + 1): if not self.running: break time.sleep(min(10, wait_time)) wait_time -= 10 if wait_time <= 0: break except Exception as e: thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True) # 发生错误时等待一段时间再继续 for _ in range(10): if not self.running: break time.sleep(1) thread_logger.info(f"线程 {thread_id} 已停止") def start_all_threads(self): """启动所有工作线程""" self.logger.info(f"启动 {self.thread_count} 个工作线程...") for i in range(self.thread_count): thread = threading.Thread( target=self.worker_thread, args=(i+1,), daemon=True, name=f"WorkerThread-{i+1}" ) thread.start() self.threads.append(thread) self.logger.info(f"线程 {i+1} 已启动") # 如果不是最后一个线程,等待5秒再启动下一个 if i < self.thread_count - 1: self.logger.info("等待5秒后启动下一个线程...") time.sleep(5) self.logger.info(f"所有 {self.thread_count} 个线程已启动") self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据") self.logger.info("使用以下命令停止: ./start_scheduler.sh stop") def stop_all_threads(self): """停止所有线程""" self.logger.info("正在停止所有线程...") self.running = False # 等待所有线程结束 for i, thread in enumerate(self.threads): if thread.is_alive(): thread.join(timeout=5) if thread.is_alive(): self.logger.warning(f"线程 {i+1} 未能正常停止") else: self.logger.info(f"线程 {i+1} 已停止") self.logger.info("所有线程已停止") def run(self): """运行调度器""" try: self.start_all_threads() # 主线程保持运行,等待信号 while self.running: time.sleep(1) except KeyboardInterrupt: self.logger.info("收到键盘中断信号") finally: self.stop_all_threads() self.cleanup() def main(): """主函数""" print("=" * 60) print("多线程定时任务调度器") print("=" * 60) print(f"线程数量: 5") print(f"处理间隔: 2分钟") print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) # 创建并运行调度器 scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2) scheduler.run() if __name__ == "__main__": main()