multi_thread_scheduler.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 多线程定时任务调度器 - 结构化处理版本
  5. 开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
  6. """
  7. import threading
  8. import time
  9. import signal
  10. import sys
  11. import os
  12. import atexit
  13. from datetime import datetime
  14. from structure_processor import StructureProcessor
  15. from utils.logging_config import get_logger
  16. class MultiThreadScheduler:
  17. def __init__(self, thread_count=5, interval_minutes=2):
  18. self.thread_count = thread_count
  19. self.interval_seconds = interval_minutes * 60
  20. self.running = True
  21. self.threads = []
  22. self.processor = StructureProcessor()
  23. self.pid_file = "structure_scheduler.pid"
  24. # 设置日志
  25. self.logger = get_logger('StructureMultiThreadScheduler')
  26. # 设置信号处理,优雅退出
  27. signal.signal(signal.SIGINT, self.signal_handler)
  28. signal.signal(signal.SIGTERM, self.signal_handler)
  29. # 注册退出时的清理函数
  30. atexit.register(self.cleanup)
  31. # 创建PID文件
  32. self.create_pid_file()
  33. def create_pid_file(self):
  34. """创建PID文件"""
  35. try:
  36. with open(self.pid_file, 'w') as f:
  37. f.write(str(os.getpid()))
  38. self.logger.info(f"PID文件已创建: {self.pid_file}")
  39. except Exception as e:
  40. self.logger.error(f"创建PID文件失败: {e}")
  41. def cleanup(self):
  42. """清理资源"""
  43. try:
  44. if os.path.exists(self.pid_file):
  45. os.remove(self.pid_file)
  46. self.logger.info("PID文件已清理")
  47. except Exception as e:
  48. self.logger.error(f"清理PID文件失败: {e}")
  49. def signal_handler(self, signum, frame):
  50. """信号处理函数,优雅退出"""
  51. signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
  52. self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
  53. self.running = False
  54. self.stop_all_threads()
  55. self.cleanup()
  56. sys.exit(0)
  57. def worker_thread(self, thread_id):
  58. """工作线程函数"""
  59. thread_logger = get_logger(f'StructureWorkerThread-{thread_id}')
  60. thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
  61. while self.running:
  62. try:
  63. start_time = time.time()
  64. # 处理一条数据
  65. thread_logger.info(f"开始处理数据...")
  66. success = self.processor.process_single_record()
  67. if success:
  68. thread_logger.info("数据处理成功")
  69. else:
  70. thread_logger.info("没有数据需要处理或处理失败")
  71. # 计算剩余等待时间
  72. elapsed_time = time.time() - start_time
  73. wait_time = max(0, self.interval_seconds - elapsed_time)
  74. if wait_time > 0:
  75. thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
  76. # 分段等待,每10秒检查一次running状态
  77. for _ in range(int(wait_time / 10) + 1):
  78. if not self.running:
  79. break
  80. time.sleep(min(10, wait_time))
  81. wait_time -= 10
  82. if wait_time <= 0:
  83. break
  84. except Exception as e:
  85. thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
  86. # 发生错误时等待一段时间再继续
  87. for _ in range(10):
  88. if not self.running:
  89. break
  90. time.sleep(1)
  91. thread_logger.info(f"线程 {thread_id} 已停止")
  92. def start_all_threads(self):
  93. """启动所有工作线程"""
  94. self.logger.info(f"启动 {self.thread_count} 个工作线程...")
  95. self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
  96. for i in range(self.thread_count):
  97. thread = threading.Thread(
  98. target=self.worker_thread,
  99. args=(i+1,),
  100. daemon=True,
  101. name=f"StructureWorkerThread-{i+1}"
  102. )
  103. thread.start()
  104. self.threads.append(thread)
  105. self.logger.info(f"线程 {i+1} 已启动")
  106. # 如果不是最后一个线程,等待5秒再启动下一个
  107. if i < self.thread_count - 1:
  108. self.logger.info("等待5秒后启动下一个线程...")
  109. time.sleep(5)
  110. self.logger.info(f"所有 {self.thread_count} 个线程已启动")
  111. self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
  112. self.logger.info("使用以下命令停止: ./start_structure.sh stop")
  113. def stop_all_threads(self):
  114. """停止所有线程"""
  115. self.logger.info("正在停止所有线程...")
  116. self.running = False
  117. # 等待所有线程结束
  118. for i, thread in enumerate(self.threads):
  119. if thread.is_alive():
  120. thread.join(timeout=5)
  121. if thread.is_alive():
  122. self.logger.warning(f"线程 {i+1} 未能正常停止")
  123. else:
  124. self.logger.info(f"线程 {i+1} 已停止")
  125. self.logger.info("所有线程已停止")
  126. def run(self):
  127. """运行调度器"""
  128. try:
  129. self.start_all_threads()
  130. # 主线程保持运行,等待信号
  131. while self.running:
  132. time.sleep(1)
  133. except KeyboardInterrupt:
  134. self.logger.info("收到键盘中断信号")
  135. finally:
  136. self.stop_all_threads()
  137. self.cleanup()
  138. def main():
  139. """主函数"""
  140. import argparse
  141. parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
  142. parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
  143. parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
  144. args = parser.parse_args()
  145. print("=" * 60)
  146. print("多线程结构化处理调度器")
  147. print("=" * 60)
  148. print(f"线程数量: {args.thread_count}")
  149. print(f"处理间隔: {args.interval_minutes}分钟")
  150. print("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
  151. print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  152. print("=" * 60)
  153. # 创建并运行调度器
  154. scheduler = MultiThreadScheduler(
  155. thread_count=args.thread_count,
  156. interval_minutes=args.interval_minutes
  157. )
  158. scheduler.run()
  159. if __name__ == "__main__":
  160. main()