multi_thread_scheduler.py 7.6 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 多线程定时任务调度器 - 结构化处理版本
  5. 开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
  6. """
  7. import threading
  8. import time
  9. import signal
  10. import sys
  11. import os
  12. import atexit
  13. from datetime import datetime
  14. from evaluate import EvaluateProcessor
  15. from utils.logging_config import get_logger
  16. class MultiThreadScheduler:
  17. def __init__(self, thread_count=5, interval_minutes=2,
  18. query_word=None, source_type=None, source_channel=None):
  19. self.thread_count = thread_count
  20. self.interval_seconds = interval_minutes * 60
  21. self.running = True
  22. self.threads = []
  23. self.processor = EvaluateProcessor()
  24. self.query_word = query_word
  25. self.source_type = source_type
  26. self.source_channel = source_channel
  27. self.pid_file = "evaluate_scheduler.pid"
  28. # 设置日志
  29. self.logger = get_logger('EvaluateMultiThreadScheduler')
  30. # 设置信号处理,优雅退出
  31. signal.signal(signal.SIGINT, self.signal_handler)
  32. signal.signal(signal.SIGTERM, self.signal_handler)
  33. # 注册退出时的清理函数
  34. atexit.register(self.cleanup)
  35. # 创建PID文件
  36. self.create_pid_file()
  37. def create_pid_file(self):
  38. """创建PID文件"""
  39. try:
  40. with open(self.pid_file, 'w') as f:
  41. f.write(str(os.getpid()))
  42. self.logger.info(f"PID文件已创建: {self.pid_file}")
  43. except Exception as e:
  44. self.logger.error(f"创建PID文件失败: {e}")
  45. def cleanup(self):
  46. """清理资源"""
  47. try:
  48. if os.path.exists(self.pid_file):
  49. os.remove(self.pid_file)
  50. self.logger.info("PID文件已清理")
  51. except Exception as e:
  52. self.logger.error(f"清理PID文件失败: {e}")
  53. def signal_handler(self, signum, frame):
  54. """信号处理函数,优雅退出"""
  55. signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
  56. self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
  57. self.running = False
  58. self.stop_all_threads()
  59. self.cleanup()
  60. sys.exit(0)
  61. def worker_thread(self, thread_id):
  62. """工作线程函数"""
  63. thread_logger = get_logger(f'EvaluateWorkerThread-{thread_id}')
  64. thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
  65. while self.running:
  66. try:
  67. start_time = time.time()
  68. # 处理一条数据
  69. thread_logger.info(f"开始处理数据...")
  70. success = self.processor.process_single_record(
  71. self.query_word, self.source_type, self.source_channel
  72. )
  73. if success:
  74. thread_logger.info("数据处理成功")
  75. else:
  76. thread_logger.info("没有数据需要处理或处理失败")
  77. # 计算剩余等待时间
  78. elapsed_time = time.time() - start_time
  79. wait_time = max(0, self.interval_seconds - elapsed_time)
  80. if wait_time > 0:
  81. thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
  82. # 分段等待,每10秒检查一次running状态
  83. for _ in range(int(wait_time / 10) + 1):
  84. if not self.running:
  85. break
  86. time.sleep(min(10, wait_time))
  87. wait_time -= 10
  88. if wait_time <= 0:
  89. break
  90. except Exception as e:
  91. thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
  92. # 发生错误时等待一段时间再继续
  93. for _ in range(10):
  94. if not self.running:
  95. break
  96. time.sleep(1)
  97. thread_logger.info(f"线程 {thread_id} 已停止")
  98. def start_all_threads(self):
  99. """启动所有工作线程"""
  100. self.logger.info(f"启动 {self.thread_count} 个工作线程...")
  101. self.logger.info(f"查询条件: query_word={self.query_word}, source_type={self.source_type}, source_channel={self.source_channel}")
  102. for i in range(self.thread_count):
  103. thread = threading.Thread(
  104. target=self.worker_thread,
  105. args=(i+1,),
  106. daemon=True,
  107. name=f"EvaluateWorkerThread-{i+1}"
  108. )
  109. thread.start()
  110. self.threads.append(thread)
  111. self.logger.info(f"线程 {i+1} 已启动")
  112. # 如果不是最后一个线程,等待5秒再启动下一个
  113. if i < self.thread_count - 1:
  114. self.logger.info("等待5秒后启动下一个线程...")
  115. time.sleep(5)
  116. self.logger.info(f"所有 {self.thread_count} 个线程已启动")
  117. self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
  118. self.logger.info("使用以下命令停止: ./start_evaluate.sh stop")
  119. def stop_all_threads(self):
  120. """停止所有线程"""
  121. self.logger.info("正在停止所有线程...")
  122. self.running = False
  123. # 等待所有线程结束
  124. for i, thread in enumerate(self.threads):
  125. if thread.is_alive():
  126. thread.join(timeout=5)
  127. if thread.is_alive():
  128. self.logger.warning(f"线程 {i+1} 未能正常停止")
  129. else:
  130. self.logger.info(f"线程 {i+1} 已停止")
  131. self.logger.info("所有线程已停止")
  132. def run(self):
  133. """运行调度器"""
  134. try:
  135. self.start_all_threads()
  136. # 主线程保持运行,等待信号
  137. while self.running:
  138. time.sleep(1)
  139. except KeyboardInterrupt:
  140. self.logger.info("收到键盘中断信号")
  141. finally:
  142. self.stop_all_threads()
  143. self.cleanup()
  144. def main():
  145. """主函数"""
  146. import argparse
  147. parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
  148. parser.add_argument('--query_word', default=None, help='query词')
  149. parser.add_argument('--source_type', default=None, help='数据源类型')
  150. parser.add_argument('--source_channel', default=None, help='数据源渠道')
  151. parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
  152. parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
  153. args = parser.parse_args()
  154. print("=" * 60)
  155. print("多线程结构化处理调度器")
  156. print("=" * 60)
  157. print(f"线程数量: {args.thread_count}")
  158. print(f"处理间隔: {args.interval_minutes}分钟")
  159. print(f"查询条件: query_word={args.query_word}, source_type={args.source_type}, source_channel={args.source_channel}")
  160. print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  161. print("=" * 60)
  162. # 创建并运行调度器
  163. scheduler = MultiThreadScheduler(
  164. thread_count=args.thread_count,
  165. interval_minutes=args.interval_minutes,
  166. query_word=args.query_word,
  167. source_type=args.source_type,
  168. source_channel=args.source_channel
  169. )
  170. scheduler.run()
  171. if __name__ == "__main__":
  172. main()