123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 多线程定时任务调度器 - 结构化处理版本
- 开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
- """
- import threading
- import time
- import signal
- import sys
- import os
- import atexit
- from datetime import datetime
- from structure_processor import StructureProcessor
- from utils.logging_config import get_logger
- class MultiThreadScheduler:
- def __init__(self, thread_count=5, interval_minutes=2):
- self.thread_count = thread_count
- self.interval_seconds = interval_minutes * 60
- self.running = True
- self.threads = []
- self.processor = StructureProcessor()
- self.pid_file = "structure_scheduler.pid"
-
- # 设置日志
- self.logger = get_logger('StructureMultiThreadScheduler')
-
- # 设置信号处理,优雅退出
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
-
- # 注册退出时的清理函数
- atexit.register(self.cleanup)
-
- # 创建PID文件
- self.create_pid_file()
-
- def create_pid_file(self):
- """创建PID文件"""
- try:
- with open(self.pid_file, 'w') as f:
- f.write(str(os.getpid()))
- self.logger.info(f"PID文件已创建: {self.pid_file}")
- except Exception as e:
- self.logger.error(f"创建PID文件失败: {e}")
-
- def cleanup(self):
- """清理资源"""
- try:
- if os.path.exists(self.pid_file):
- os.remove(self.pid_file)
- self.logger.info("PID文件已清理")
- except Exception as e:
- self.logger.error(f"清理PID文件失败: {e}")
-
- def signal_handler(self, signum, frame):
- """信号处理函数,优雅退出"""
- signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
- self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
- self.running = False
- self.stop_all_threads()
- self.cleanup()
- sys.exit(0)
-
- def worker_thread(self, thread_id):
- """工作线程函数"""
- thread_logger = get_logger(f'StructureWorkerThread-{thread_id}')
- thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
-
- while self.running:
- try:
- start_time = time.time()
-
- # 处理一条数据
- thread_logger.info(f"开始处理数据...")
- success = self.processor.process_single_record()
-
- if success:
- thread_logger.info("数据处理成功")
- else:
- thread_logger.info("没有数据需要处理或处理失败")
-
- # 计算剩余等待时间
- elapsed_time = time.time() - start_time
- wait_time = max(0, self.interval_seconds - elapsed_time)
-
- if wait_time > 0:
- thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
- # 分段等待,每10秒检查一次running状态
- for _ in range(int(wait_time / 10) + 1):
- if not self.running:
- break
- time.sleep(min(10, wait_time))
- wait_time -= 10
- if wait_time <= 0:
- break
-
- except Exception as e:
- thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
- # 发生错误时等待一段时间再继续
- for _ in range(10):
- if not self.running:
- break
- time.sleep(1)
-
- thread_logger.info(f"线程 {thread_id} 已停止")
-
- def start_all_threads(self):
- """启动所有工作线程"""
- self.logger.info(f"启动 {self.thread_count} 个工作线程...")
- self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
-
- for i in range(self.thread_count):
- thread = threading.Thread(
- target=self.worker_thread,
- args=(i+1,),
- daemon=True,
- name=f"StructureWorkerThread-{i+1}"
- )
- thread.start()
- self.threads.append(thread)
- self.logger.info(f"线程 {i+1} 已启动")
-
- # 如果不是最后一个线程,等待5秒再启动下一个
- if i < self.thread_count - 1:
- self.logger.info("等待5秒后启动下一个线程...")
- time.sleep(5)
-
- self.logger.info(f"所有 {self.thread_count} 个线程已启动")
- self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
- self.logger.info("使用以下命令停止: ./start_structure.sh stop")
-
- def stop_all_threads(self):
- """停止所有线程"""
- self.logger.info("正在停止所有线程...")
- self.running = False
-
- # 等待所有线程结束
- for i, thread in enumerate(self.threads):
- if thread.is_alive():
- thread.join(timeout=5)
- if thread.is_alive():
- self.logger.warning(f"线程 {i+1} 未能正常停止")
- else:
- self.logger.info(f"线程 {i+1} 已停止")
-
- self.logger.info("所有线程已停止")
-
- def run(self):
- """运行调度器"""
- try:
- self.start_all_threads()
-
- # 主线程保持运行,等待信号
- while self.running:
- time.sleep(1)
-
- except KeyboardInterrupt:
- self.logger.info("收到键盘中断信号")
- finally:
- self.stop_all_threads()
- self.cleanup()
- def main():
- """主函数"""
- import argparse
-
- parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
- parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
- parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
-
- args = parser.parse_args()
-
- print("=" * 60)
- print("多线程结构化处理调度器")
- print("=" * 60)
- print(f"线程数量: {args.thread_count}")
- print(f"处理间隔: {args.interval_minutes}分钟")
- print("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
- print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("=" * 60)
-
- # 创建并运行调度器
- scheduler = MultiThreadScheduler(
- thread_count=args.thread_count,
- interval_minutes=args.interval_minutes
- )
- scheduler.run()
- if __name__ == "__main__":
- main()
|