worker.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 单线程内容识别 Worker
  5. 通过 Docker 的副本数实现并发,不再使用应用内多线程调度器。
  6. """
  7. import os
  8. import sys
  9. import time
  10. import signal
  11. from typing import Optional
  12. # 确保可以导入到上级目录的公共模块
  13. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. from utils.logging_config import get_logger
  15. from content_indentify.indentify import ContentIdentifier
  16. class ContentWorker:
  17. def __init__(self,
  18. interval_seconds: int = 120,
  19. idle_sleep_seconds: int = 10):
  20. self.interval_seconds = interval_seconds
  21. self.idle_sleep_seconds = idle_sleep_seconds
  22. self.running = True
  23. self.logger = get_logger('ContentWorker')
  24. self.identifier = ContentIdentifier()
  25. signal.signal(signal.SIGINT, self._handle_signal)
  26. signal.signal(signal.SIGTERM, self._handle_signal)
  27. def _handle_signal(self, signum, frame):
  28. self.logger.info(f"收到信号 {signum},准备优雅退出…")
  29. self.running = False
  30. def run_forever(self):
  31. self.logger.info(f"启动 ContentWorker,间隔: {self.interval_seconds}s,空闲等待: {self.idle_sleep_seconds}s")
  32. while self.running:
  33. start_time = time.time()
  34. try:
  35. processed = self.identifier.process_single_record()
  36. if not processed:
  37. # 没有可处理的数据,短暂休眠后继续尝试
  38. time.sleep(self.idle_sleep_seconds)
  39. else:
  40. # 成功处理后,按设定间隔节流
  41. elapsed = time.time() - start_time
  42. wait_time = max(0, self.interval_seconds - elapsed)
  43. if wait_time > 0:
  44. time.sleep(wait_time)
  45. except Exception as exc:
  46. self.logger.error(f"处理异常: {exc}", exc_info=True)
  47. time.sleep(self.idle_sleep_seconds)
  48. def main():
  49. interval_seconds = int(os.getenv('INTERVAL_SECONDS', '120'))
  50. idle_sleep_seconds = int(os.getenv('IDLE_SLEEP_SECONDS', '10'))
  51. worker = ContentWorker(interval_seconds=interval_seconds,
  52. idle_sleep_seconds=idle_sleep_seconds)
  53. worker.run_forever()
  54. if __name__ == '__main__':
  55. main()