multi_thread_scheduler.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 多线程定时任务调度器 - 结构化处理版本
  5. 开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
  6. """
  7. import threading
  8. import time
  9. import signal
  10. import sys
  11. import os
  12. import atexit
  13. from datetime import datetime
  14. from structure_processor import StructureProcessor
  15. from utils.logging_config import get_logger
  16. class MultiThreadScheduler:
  17. def __init__(self, thread_count=5, interval_minutes=2,
  18. query_word=None, source_type=None, source_channel=None):
  19. self.thread_count = thread_count
  20. self.interval_seconds = interval_minutes * 60
  21. self.running = True
  22. self.threads = []
  23. self.processor = StructureProcessor()
  24. self.query_word = query_word
  25. self.source_type = source_type
  26. self.source_channel = source_channel
  27. self.pid_file = "structure_scheduler.pid"
  28. # 设置日志
  29. self.logger = get_logger('StructureMultiThreadScheduler')
  30. # 设置信号处理,优雅退出
  31. signal.signal(signal.SIGINT, self.signal_handler)
  32. signal.signal(signal.SIGTERM, self.signal_handler)
  33. # 注册退出时的清理函数
  34. atexit.register(self.cleanup)
  35. # 创建PID文件
  36. self.create_pid_file()
  37. def create_pid_file(self):
  38. """创建PID文件"""
  39. try:
  40. with open(self.pid_file, 'w') as f:
  41. f.write(str(os.getpid()))
  42. self.logger.info(f"PID文件已创建: {self.pid_file}")
  43. except Exception as e:
  44. self.logger.error(f"创建PID文件失败: {e}")
  45. def cleanup(self):
  46. """清理资源"""
  47. try:
  48. if os.path.exists(self.pid_file):
  49. os.remove(self.pid_file)
  50. self.logger.info("PID文件已清理")
  51. except Exception as e:
  52. self.logger.error(f"清理PID文件失败: {e}")
  53. def signal_handler(self, signum, frame):
  54. """信号处理函数,优雅退出"""
  55. signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
  56. self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
  57. self.running = False
  58. self.stop_all_threads()
  59. self.cleanup()
  60. sys.exit(0)
  61. def worker_thread(self, thread_id):
  62. """工作线程函数"""
  63. thread_logger = get_logger(f'StructureWorkerThread-{thread_id}')
  64. thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
  65. while self.running:
  66. try:
  67. start_time = time.time()
  68. # 处理一条数据
  69. thread_logger.info(f"开始处理数据...")
  70. success = self.processor.process_single_record(
  71. self.query_word, self.source_type, self.source_channel
  72. )
  73. if success:
  74. thread_logger.info("数据处理成功")
  75. else:
  76. thread_logger.info("没有数据需要处理或处理失败")
  77. # 计算剩余等待时间
  78. elapsed_time = time.time() - start_time
  79. wait_time = max(0, self.interval_seconds - elapsed_time)
  80. if wait_time > 0:
  81. thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
  82. # 分段等待,每10秒检查一次running状态
  83. for _ in range(int(wait_time / 10) + 1):
  84. if not self.running:
  85. break
  86. time.sleep(min(10, wait_time))
  87. wait_time -= 10
  88. if wait_time <= 0:
  89. break
  90. except Exception as e:
  91. thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
  92. # 发生错误时等待一段时间再继续
  93. for _ in range(10):
  94. if not self.running:
  95. break
  96. time.sleep(1)
  97. thread_logger.info(f"线程 {thread_id} 已停止")
  98. def start_all_threads(self):
  99. """启动所有工作线程"""
  100. self.logger.info(f"启动 {self.thread_count} 个工作线程...")
  101. self.logger.info(f"查询条件: query_word={self.query_word}, source_type={self.source_type}, source_channel={self.source_channel}")
  102. for i in range(self.thread_count):
  103. thread = threading.Thread(
  104. target=self.worker_thread,
  105. args=(i+1,),
  106. daemon=True,
  107. name=f"StructureWorkerThread-{i+1}"
  108. )
  109. thread.start()
  110. self.threads.append(thread)
  111. self.logger.info(f"线程 {i+1} 已启动")
  112. # 如果不是最后一个线程,等待5秒再启动下一个
  113. if i < self.thread_count - 1:
  114. self.logger.info("等待5秒后启动下一个线程...")
  115. time.sleep(5)
  116. self.logger.info(f"所有 {self.thread_count} 个线程已启动")
  117. self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
  118. self.logger.info("使用以下命令停止: ./start_structure.sh stop")
  119. def stop_all_threads(self):
  120. """停止所有线程"""
  121. self.logger.info("正在停止所有线程...")
  122. self.running = False
  123. # 等待所有线程结束
  124. for i, thread in enumerate(self.threads):
  125. if thread.is_alive():
  126. thread.join(timeout=5)
  127. if thread.is_alive():
  128. self.logger.warning(f"线程 {i+1} 未能正常停止")
  129. else:
  130. self.logger.info(f"线程 {i+1} 已停止")
  131. self.logger.info("所有线程已停止")
  132. def run(self):
  133. """运行调度器"""
  134. try:
  135. self.start_all_threads()
  136. # 主线程保持运行,等待信号
  137. while self.running:
  138. time.sleep(1)
  139. except KeyboardInterrupt:
  140. self.logger.info("收到键盘中断信号")
  141. finally:
  142. self.stop_all_threads()
  143. self.cleanup()
  144. def main():
  145. """主函数"""
  146. import argparse
  147. parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
  148. parser.add_argument('--query_word', default=None, help='query词')
  149. parser.add_argument('--source_type', default=None, help='数据源类型')
  150. parser.add_argument('--source_channel', default=None, help='数据源渠道')
  151. parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
  152. parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
  153. args = parser.parse_args()
  154. print("=" * 60)
  155. print("多线程结构化处理调度器")
  156. print("=" * 60)
  157. print(f"线程数量: {args.thread_count}")
  158. print(f"处理间隔: {args.interval_minutes}分钟")
  159. print(f"查询条件: query_word={args.query_word}, source_type={args.source_type}, source_channel={args.source_channel}")
  160. print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  161. print("=" * 60)
  162. # 创建并运行调度器
  163. scheduler = MultiThreadScheduler(
  164. thread_count=args.thread_count,
  165. interval_minutes=args.interval_minutes,
  166. query_word=args.query_word,
  167. source_type=args.source_type,
  168. source_channel=args.source_channel
  169. )
  170. scheduler.run()
  171. if __name__ == "__main__":
  172. main()