#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 单线程内容识别 Worker 通过 Docker 的副本数实现并发,不再使用应用内多线程调度器。 """ import os import sys import time import signal from typing import Optional # 确保可以导入到上级目录的公共模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.logging_config import get_logger from content_indentify.indentify import ContentIdentifier class ContentWorker: def __init__(self, interval_seconds: int = 120, idle_sleep_seconds: int = 10): self.interval_seconds = interval_seconds self.idle_sleep_seconds = idle_sleep_seconds self.running = True self.logger = get_logger('ContentWorker') self.identifier = ContentIdentifier() signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) def _handle_signal(self, signum, frame): self.logger.info(f"收到信号 {signum},准备优雅退出…") self.running = False def run_forever(self): self.logger.info(f"启动 ContentWorker,间隔: {self.interval_seconds}s,空闲等待: {self.idle_sleep_seconds}s") while self.running: start_time = time.time() try: processed = self.identifier.process_single_record() if not processed: # 没有可处理的数据,短暂休眠后继续尝试 time.sleep(self.idle_sleep_seconds) else: # 成功处理后,按设定间隔节流 elapsed = time.time() - start_time wait_time = max(0, self.interval_seconds - elapsed) if wait_time > 0: time.sleep(wait_time) except Exception as exc: self.logger.error(f"处理异常: {exc}", exc_info=True) time.sleep(self.idle_sleep_seconds) def main(): interval_seconds = int(os.getenv('INTERVAL_SECONDS', '120')) idle_sleep_seconds = int(os.getenv('IDLE_SLEEP_SECONDS', '10')) worker = ContentWorker(interval_seconds=interval_seconds, idle_sleep_seconds=idle_sleep_seconds) worker.run_forever() if __name__ == '__main__': main()