123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 多线程定时任务调度器 - 结构化处理版本
- 开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
- """
- import threading
- import time
- import signal
- import sys
- import os
- import atexit
- from datetime import datetime
- from evaluate import EvaluateProcessor
- from utils.logging_config import get_logger
- class MultiThreadScheduler:
- def __init__(self, thread_count=5, interval_minutes=2,
- query_word=None, source_type=None, source_channel=None):
- self.thread_count = thread_count
- self.interval_seconds = interval_minutes * 60
- self.running = True
- self.threads = []
- self.processor = EvaluateProcessor()
- self.query_word = query_word
- self.source_type = source_type
- self.source_channel = source_channel
- self.pid_file = "evaluate_scheduler.pid"
-
- # 设置日志
- self.logger = get_logger('EvaluateMultiThreadScheduler')
-
- # 设置信号处理,优雅退出
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
-
- # 注册退出时的清理函数
- atexit.register(self.cleanup)
-
- # 创建PID文件
- self.create_pid_file()
-
- def create_pid_file(self):
- """创建PID文件"""
- try:
- with open(self.pid_file, 'w') as f:
- f.write(str(os.getpid()))
- self.logger.info(f"PID文件已创建: {self.pid_file}")
- except Exception as e:
- self.logger.error(f"创建PID文件失败: {e}")
-
- def cleanup(self):
- """清理资源"""
- try:
- if os.path.exists(self.pid_file):
- os.remove(self.pid_file)
- self.logger.info("PID文件已清理")
- except Exception as e:
- self.logger.error(f"清理PID文件失败: {e}")
-
- def signal_handler(self, signum, frame):
- """信号处理函数,优雅退出"""
- signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
- self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
- self.running = False
- self.stop_all_threads()
- self.cleanup()
- sys.exit(0)
-
- def worker_thread(self, thread_id):
- """工作线程函数"""
- thread_logger = get_logger(f'EvaluateWorkerThread-{thread_id}')
- thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
-
- while self.running:
- try:
- start_time = time.time()
-
- # 处理一条数据
- thread_logger.info(f"开始处理数据...")
- success = self.processor.process_single_record(
- self.query_word, self.source_type, self.source_channel
- )
-
- if success:
- thread_logger.info("数据处理成功")
- else:
- thread_logger.info("没有数据需要处理或处理失败")
-
- # 计算剩余等待时间
- elapsed_time = time.time() - start_time
- wait_time = max(0, self.interval_seconds - elapsed_time)
-
- if wait_time > 0:
- thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
- # 分段等待,每10秒检查一次running状态
- for _ in range(int(wait_time / 10) + 1):
- if not self.running:
- break
- time.sleep(min(10, wait_time))
- wait_time -= 10
- if wait_time <= 0:
- break
-
- except Exception as e:
- thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
- # 发生错误时等待一段时间再继续
- for _ in range(10):
- if not self.running:
- break
- time.sleep(1)
-
- thread_logger.info(f"线程 {thread_id} 已停止")
-
- def start_all_threads(self):
- """启动所有工作线程"""
- self.logger.info(f"启动 {self.thread_count} 个工作线程...")
- self.logger.info(f"查询条件: query_word={self.query_word}, source_type={self.source_type}, source_channel={self.source_channel}")
-
- for i in range(self.thread_count):
- thread = threading.Thread(
- target=self.worker_thread,
- args=(i+1,),
- daemon=True,
- name=f"EvaluateWorkerThread-{i+1}"
- )
- thread.start()
- self.threads.append(thread)
- self.logger.info(f"线程 {i+1} 已启动")
-
- # 如果不是最后一个线程,等待5秒再启动下一个
- if i < self.thread_count - 1:
- self.logger.info("等待5秒后启动下一个线程...")
- time.sleep(5)
-
- self.logger.info(f"所有 {self.thread_count} 个线程已启动")
- self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
- self.logger.info("使用以下命令停止: ./start_evaluate.sh stop")
-
- def stop_all_threads(self):
- """停止所有线程"""
- self.logger.info("正在停止所有线程...")
- self.running = False
-
- # 等待所有线程结束
- for i, thread in enumerate(self.threads):
- if thread.is_alive():
- thread.join(timeout=5)
- if thread.is_alive():
- self.logger.warning(f"线程 {i+1} 未能正常停止")
- else:
- self.logger.info(f"线程 {i+1} 已停止")
-
- self.logger.info("所有线程已停止")
-
- def run(self):
- """运行调度器"""
- try:
- self.start_all_threads()
-
- # 主线程保持运行,等待信号
- while self.running:
- time.sleep(1)
-
- except KeyboardInterrupt:
- self.logger.info("收到键盘中断信号")
- finally:
- self.stop_all_threads()
- self.cleanup()
- def main():
- """主函数"""
- import argparse
-
- parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
- parser.add_argument('--query_word', default=None, help='query词')
- parser.add_argument('--source_type', default=None, help='数据源类型')
- parser.add_argument('--source_channel', default=None, help='数据源渠道')
- parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
- parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
-
- args = parser.parse_args()
-
- print("=" * 60)
- print("多线程结构化处理调度器")
- print("=" * 60)
- print(f"线程数量: {args.thread_count}")
- print(f"处理间隔: {args.interval_minutes}分钟")
- print(f"查询条件: query_word={args.query_word}, source_type={args.source_type}, source_channel={args.source_channel}")
- print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("=" * 60)
-
- # 创建并运行调度器
- scheduler = MultiThreadScheduler(
- thread_count=args.thread_count,
- interval_minutes=args.interval_minutes,
- query_word=args.query_word,
- source_type=args.source_type,
- source_channel=args.source_channel
- )
- scheduler.run()
- if __name__ == "__main__":
- main()
|