1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 单线程内容识别 Worker
- 通过 Docker 的副本数实现并发,不再使用应用内多线程调度器。
- """
- import os
- import sys
- import time
- import signal
- from typing import Optional
- # 确保可以导入到上级目录的公共模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.logging_config import get_logger
- from content_indentify.indentify import ContentIdentifier
- class ContentWorker:
- def __init__(self,
- interval_seconds: int = 120,
- idle_sleep_seconds: int = 10):
- self.interval_seconds = interval_seconds
- self.idle_sleep_seconds = idle_sleep_seconds
- self.running = True
- self.logger = get_logger('ContentWorker')
- self.identifier = ContentIdentifier()
- signal.signal(signal.SIGINT, self._handle_signal)
- signal.signal(signal.SIGTERM, self._handle_signal)
- def _handle_signal(self, signum, frame):
- self.logger.info(f"收到信号 {signum},准备优雅退出…")
- self.running = False
- def run_forever(self):
- self.logger.info(f"启动 ContentWorker,间隔: {self.interval_seconds}s,空闲等待: {self.idle_sleep_seconds}s")
- while self.running:
- start_time = time.time()
- try:
- processed = self.identifier.process_single_record()
- if not processed:
- # 没有可处理的数据,短暂休眠后继续尝试
- time.sleep(self.idle_sleep_seconds)
- else:
- # 成功处理后,按设定间隔节流
- elapsed = time.time() - start_time
- wait_time = max(0, self.interval_seconds - elapsed)
- if wait_time > 0:
- time.sleep(wait_time)
- except Exception as exc:
- self.logger.error(f"处理异常: {exc}", exc_info=True)
- time.sleep(self.idle_sleep_seconds)
- def main():
- interval_seconds = int(os.getenv('INTERVAL_SECONDS', '120'))
- idle_sleep_seconds = int(os.getenv('IDLE_SLEEP_SECONDS', '10'))
- worker = ContentWorker(interval_seconds=interval_seconds,
- idle_sleep_seconds=idle_sleep_seconds)
- worker.run_forever()
- if __name__ == '__main__':
- main()
|