multi_thread_scheduler.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 多线程定时任务调度器
  5. 开启5个线程,每2分钟调用一次identifier.process_single_record()处理一条数据
  6. """
  7. import threading
  8. import time
  9. import signal
  10. import sys
  11. import os
  12. import atexit
  13. from datetime import datetime
  14. from indentify import ContentIdentifier
  15. from utils.logging_config import get_logger
  16. class MultiThreadScheduler:
  17. def __init__(self, thread_count=5, interval_minutes=2):
  18. self.thread_count = thread_count
  19. self.interval_seconds = interval_minutes * 60
  20. self.running = True
  21. self.threads = []
  22. self.identifier = ContentIdentifier()
  23. self.pid_file = "scheduler.pid"
  24. # 设置日志
  25. self.logger = get_logger('MultiThreadScheduler')
  26. # 设置信号处理,优雅退出
  27. signal.signal(signal.SIGINT, self.signal_handler)
  28. signal.signal(signal.SIGTERM, self.signal_handler)
  29. # 注册退出时的清理函数
  30. atexit.register(self.cleanup)
  31. # 创建PID文件
  32. self.create_pid_file()
  33. def create_pid_file(self):
  34. """创建PID文件"""
  35. try:
  36. with open(self.pid_file, 'w') as f:
  37. f.write(str(os.getpid()))
  38. self.logger.info(f"PID文件已创建: {self.pid_file}")
  39. except Exception as e:
  40. self.logger.error(f"创建PID文件失败: {e}")
  41. def cleanup(self):
  42. """清理资源"""
  43. try:
  44. if os.path.exists(self.pid_file):
  45. os.remove(self.pid_file)
  46. self.logger.info("PID文件已清理")
  47. except Exception as e:
  48. self.logger.error(f"清理PID文件失败: {e}")
  49. def signal_handler(self, signum, frame):
  50. """信号处理函数,优雅退出"""
  51. signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
  52. self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
  53. self.running = False
  54. self.stop_all_threads()
  55. self.cleanup()
  56. sys.exit(0)
  57. def worker_thread(self, thread_id):
  58. """工作线程函数"""
  59. thread_logger = get_logger(f'WorkerThread-{thread_id}')
  60. thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
  61. while self.running:
  62. try:
  63. start_time = time.time()
  64. # 处理一条数据
  65. thread_logger.info(f"开始处理数据...")
  66. success = self.identifier.process_single_record()
  67. if success:
  68. thread_logger.info("数据处理成功")
  69. else:
  70. thread_logger.info("没有数据需要处理或处理失败")
  71. # 计算剩余等待时间
  72. elapsed_time = time.time() - start_time
  73. wait_time = max(0, self.interval_seconds - elapsed_time)
  74. if wait_time > 0:
  75. thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
  76. # 分段等待,每10秒检查一次running状态
  77. for _ in range(int(wait_time / 10) + 1):
  78. if not self.running:
  79. break
  80. time.sleep(min(10, wait_time))
  81. wait_time -= 10
  82. if wait_time <= 0:
  83. break
  84. except Exception as e:
  85. thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
  86. # 发生错误时等待一段时间再继续
  87. for _ in range(10):
  88. if not self.running:
  89. break
  90. time.sleep(1)
  91. thread_logger.info(f"线程 {thread_id} 已停止")
  92. def start_all_threads(self):
  93. """启动所有工作线程"""
  94. self.logger.info(f"启动 {self.thread_count} 个工作线程...")
  95. for i in range(self.thread_count):
  96. thread = threading.Thread(
  97. target=self.worker_thread,
  98. args=(i+1,),
  99. daemon=True,
  100. name=f"WorkerThread-{i+1}"
  101. )
  102. thread.start()
  103. self.threads.append(thread)
  104. self.logger.info(f"线程 {i+1} 已启动")
  105. # 如果不是最后一个线程,等待5秒再启动下一个
  106. if i < self.thread_count - 1:
  107. self.logger.info("等待5秒后启动下一个线程...")
  108. time.sleep(5)
  109. self.logger.info(f"所有 {self.thread_count} 个线程已启动")
  110. self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
  111. self.logger.info("使用以下命令停止: ./start_scheduler.sh stop")
  112. def stop_all_threads(self):
  113. """停止所有线程"""
  114. self.logger.info("正在停止所有线程...")
  115. self.running = False
  116. # 等待所有线程结束
  117. for i, thread in enumerate(self.threads):
  118. if thread.is_alive():
  119. thread.join(timeout=5)
  120. if thread.is_alive():
  121. self.logger.warning(f"线程 {i+1} 未能正常停止")
  122. else:
  123. self.logger.info(f"线程 {i+1} 已停止")
  124. self.logger.info("所有线程已停止")
  125. def run(self):
  126. """运行调度器"""
  127. try:
  128. self.start_all_threads()
  129. # 主线程保持运行,等待信号
  130. while self.running:
  131. time.sleep(1)
  132. except KeyboardInterrupt:
  133. self.logger.info("收到键盘中断信号")
  134. finally:
  135. self.stop_all_threads()
  136. self.cleanup()
  137. def main():
  138. """主函数"""
  139. print("=" * 60)
  140. print("多线程定时任务调度器")
  141. print("=" * 60)
  142. print(f"线程数量: 5")
  143. print(f"处理间隔: 2分钟")
  144. print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  145. print("=" * 60)
  146. # 创建并运行调度器
  147. scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2)
  148. scheduler.run()
  149. if __name__ == "__main__":
  150. main()