multi_thread_scheduler.py 10 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 多线程定时任务调度器
  5. 开启5个线程,每2分钟调用一次identifier.process_single_record()处理一条数据
  6. """
  7. import threading
  8. import time
  9. import signal
  10. import sys
  11. import os
  12. import atexit
  13. import gc
  14. import psutil
  15. import traceback
  16. from datetime import datetime
  17. from indentify import ContentIdentifier
  18. from utils.logging_config import get_logger
  19. class MultiThreadScheduler:
  20. def __init__(self, thread_count=5, interval_minutes=2):
  21. self.thread_count = thread_count
  22. self.interval_seconds = interval_minutes * 60
  23. self.running = True
  24. self.threads = []
  25. self.identifier = None # 延迟初始化
  26. self.pid_file = "scheduler.pid"
  27. self.max_memory_mb = 2048 # 最大内存使用量
  28. self.last_gc_time = time.time()
  29. self.gc_interval = 300 # 每5分钟强制垃圾回收
  30. # 设置日志
  31. self.logger = get_logger('MultiThreadScheduler')
  32. # 设置信号处理,优雅退出
  33. signal.signal(signal.SIGINT, self.signal_handler)
  34. signal.signal(signal.SIGTERM, self.signal_handler)
  35. # 注册退出时的清理函数
  36. atexit.register(self.cleanup)
  37. # 创建PID文件
  38. self.create_pid_file()
  39. # 初始化ContentIdentifier(延迟初始化)
  40. self.init_identifier()
  41. def init_identifier(self):
  42. """延迟初始化ContentIdentifier"""
  43. try:
  44. if self.identifier is None:
  45. self.logger.info("初始化ContentIdentifier...")
  46. self.identifier = ContentIdentifier()
  47. self.logger.info("ContentIdentifier初始化成功")
  48. except Exception as e:
  49. self.logger.error(f"ContentIdentifier初始化失败: {e}")
  50. self.identifier = None
  51. def create_pid_file(self):
  52. """创建PID文件"""
  53. try:
  54. with open(self.pid_file, 'w') as f:
  55. f.write(str(os.getpid()))
  56. self.logger.info(f"PID文件已创建: {self.pid_file}")
  57. except Exception as e:
  58. self.logger.error(f"创建PID文件失败: {e}")
  59. def cleanup(self):
  60. """清理资源"""
  61. try:
  62. if os.path.exists(self.pid_file):
  63. os.remove(self.pid_file)
  64. self.logger.info("PID文件已清理")
  65. except Exception as e:
  66. self.logger.error(f"清理PID文件失败: {e}")
  67. # 清理ContentIdentifier
  68. if self.identifier:
  69. try:
  70. del self.identifier
  71. self.identifier = None
  72. except:
  73. pass
  74. # 强制垃圾回收
  75. gc.collect()
  76. def signal_handler(self, signum, frame):
  77. """信号处理函数,优雅退出"""
  78. signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
  79. self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
  80. self.running = False
  81. self.stop_all_threads()
  82. self.cleanup()
  83. sys.exit(0)
  84. def check_memory_usage(self):
  85. """检查内存使用情况"""
  86. try:
  87. process = psutil.Process(os.getpid())
  88. memory_info = process.memory_info()
  89. memory_mb = memory_info.rss / 1024 / 1024
  90. # 记录内存使用
  91. if memory_mb > self.max_memory_mb * 0.8:
  92. self.logger.warning(f"内存使用较高: {memory_mb:.1f}MB / {self.max_memory_mb}MB")
  93. return memory_mb
  94. except Exception as e:
  95. self.logger.error(f"检查内存使用失败: {e}")
  96. return 0
  97. def force_garbage_collection(self):
  98. """强制垃圾回收"""
  99. try:
  100. current_time = time.time()
  101. if current_time - self.last_gc_time > self.gc_interval:
  102. self.logger.info("执行强制垃圾回收...")
  103. collected = gc.collect()
  104. self.logger.info(f"垃圾回收完成,清理了 {collected} 个对象")
  105. self.last_gc_time = current_time
  106. except Exception as e:
  107. self.logger.error(f"垃圾回收失败: {e}")
  108. def worker_thread(self, thread_id):
  109. """工作线程函数"""
  110. thread_logger = get_logger(f'WorkerThread-{thread_id}')
  111. thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
  112. # 线程本地变量
  113. local_identifier = None
  114. while self.running:
  115. try:
  116. start_time = time.time()
  117. # 检查内存使用
  118. memory_mb = self.check_memory_usage()
  119. if memory_mb > self.max_memory_mb:
  120. thread_logger.error(f"内存使用过高 ({memory_mb:.1f}MB),跳过本次处理")
  121. time.sleep(60) # 等待1分钟
  122. continue
  123. # 强制垃圾回收
  124. self.force_garbage_collection()
  125. # 确保identifier可用
  126. if self.identifier is None:
  127. self.init_identifier()
  128. if self.identifier is None:
  129. thread_logger.error("ContentIdentifier不可用,等待下次重试")
  130. time.sleep(60)
  131. continue
  132. # 处理一条数据
  133. thread_logger.info(f"开始处理数据...")
  134. success = self.identifier.process_single_record()
  135. if success:
  136. thread_logger.info("数据处理成功")
  137. else:
  138. thread_logger.info("没有数据需要处理或处理失败")
  139. # 计算剩余等待时间
  140. elapsed_time = time.time() - start_time
  141. wait_time = max(0, self.interval_seconds - elapsed_time)
  142. if wait_time > 0:
  143. thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
  144. # 分段等待,每10秒检查一次running状态
  145. for _ in range(int(wait_time / 10) + 1):
  146. if not self.running:
  147. break
  148. time.sleep(min(10, wait_time))
  149. wait_time -= 10
  150. if wait_time <= 0:
  151. break
  152. except MemoryError as e:
  153. thread_logger.error(f"内存不足错误: {e}")
  154. # 强制垃圾回收
  155. gc.collect()
  156. time.sleep(120) # 等待2分钟
  157. except Exception as e:
  158. thread_logger.error(f"处理过程中发生错误: {e}")
  159. thread_logger.error(f"错误详情: {traceback.format_exc()}")
  160. # 检查是否是严重错误
  161. if "double free" in str(e).lower() or "corruption" in str(e).lower():
  162. thread_logger.critical("检测到严重的内存错误,线程将退出")
  163. break
  164. # 发生错误时等待一段时间再继续
  165. for _ in range(10):
  166. if not self.running:
  167. break
  168. time.sleep(1)
  169. thread_logger.info(f"线程 {thread_id} 已停止")
  170. def start_all_threads(self):
  171. """启动所有工作线程"""
  172. self.logger.info(f"启动 {self.thread_count} 个工作线程...")
  173. for i in range(self.thread_count):
  174. thread = threading.Thread(
  175. target=self.worker_thread,
  176. args=(i+1,),
  177. daemon=True,
  178. name=f"WorkerThread-{i+1}"
  179. )
  180. thread.start()
  181. self.threads.append(thread)
  182. self.logger.info(f"线程 {i+1} 已启动")
  183. # 如果不是最后一个线程,等待5秒再启动下一个
  184. if i < self.thread_count - 1:
  185. self.logger.info("等待30秒后启动下一个线程...")
  186. time.sleep(30)
  187. self.logger.info(f"所有 {self.thread_count} 个线程已启动")
  188. self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
  189. self.logger.info("使用以下命令停止: ./start_scheduler.sh stop")
  190. def stop_all_threads(self):
  191. """停止所有线程"""
  192. self.logger.info("正在停止所有线程...")
  193. self.running = False
  194. # 等待所有线程结束
  195. for i, thread in enumerate(self.threads):
  196. if thread.is_alive():
  197. thread.join(timeout=5)
  198. if thread.is_alive():
  199. self.logger.warning(f"线程 {i+1} 未能正常停止")
  200. else:
  201. self.logger.info(f"线程 {i+1} 已停止")
  202. self.logger.info("所有线程已停止")
  203. def run(self):
  204. """运行调度器"""
  205. try:
  206. # 设置Python内存管理
  207. gc.set_threshold(700, 10, 10) # 更积极的垃圾回收
  208. self.start_all_threads()
  209. # 主线程保持运行,等待信号
  210. while self.running:
  211. time.sleep(1)
  212. # 定期检查内存和垃圾回收
  213. self.force_garbage_collection()
  214. except KeyboardInterrupt:
  215. self.logger.info("收到键盘中断信号")
  216. except MemoryError as e:
  217. self.logger.critical(f"主线程内存不足: {e}")
  218. except Exception as e:
  219. self.logger.critical(f"主线程发生错误: {e}")
  220. self.logger.critical(f"错误详情: {traceback.format_exc()}")
  221. finally:
  222. self.stop_all_threads()
  223. self.cleanup()
  224. def main():
  225. """主函数"""
  226. print("=" * 60)
  227. print("多线程定时任务调度器")
  228. print("=" * 60)
  229. print(f"线程数量: 5")
  230. print(f"处理间隔: 2分钟")
  231. print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  232. print("=" * 60)
  233. try:
  234. # 创建并运行调度器
  235. scheduler = MultiThreadScheduler(thread_count=3, interval_minutes=2)
  236. scheduler.run()
  237. except Exception as e:
  238. print(f"调度器启动失败: {e}")
  239. print(f"错误详情: {traceback.format_exc()}")
  240. sys.exit(1)
  241. if __name__ == "__main__":
  242. main()