jihuaqiang 15 hours ago
parent
commit
3ed8a07048
5 changed files with 228 additions and 0 deletions
  1. 18 0
      .dockerignore
  2. 30 0
      Dockerfile
  3. 68 0
      content_indentify/worker.py
  4. 48 0
      docker-compose.yml
  5. 64 0
      structure/worker.py

+ 18 - 0
.dockerignore

@@ -0,0 +1,18 @@
+.git
+.gitignore
+**/__pycache__/
+**/*.pyc
+**/*.pyo
+**/*.pyd
+**/*.log
+**/.pytest_cache/
+**/.mypy_cache/
+**/.ruff_cache/
+.DS_Store
+venv/
+.venv/
+env/
+.env
+content_indentify/logs/
+structure/logs/
+

+ 30 - 0
Dockerfile

@@ -0,0 +1,30 @@
+FROM python:3.11-slim
+
+# Prevent Python from writing .pyc files and ensure stdout/stderr are unbuffered
+ENV PYTHONDONTWRITEBYTECODE=1 \
+    PYTHONUNBUFFERED=1 \
+    PIP_NO_CACHE_DIR=1
+
+# Install system dependencies (ffmpeg for media handling, curl for health/debug)
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    ffmpeg \
+    curl \
+    ca-certificates \
+ && rm -rf /var/lib/apt/lists/*
+
+WORKDIR /app
+
+# Install Python dependencies first for better layer caching
+COPY requirements.txt /app/requirements.txt
+RUN pip install --upgrade pip && pip install -r /app/requirements.txt
+
+# Copy the rest of the source code
+COPY . /app
+
+# Default envs (override via compose/.env)
+ENV TZ=Asia/Shanghai \
+    PYTHONPATH=/app
+
+# Default command is a no-op; each service overrides with its own command
+CMD ["python", "-V"]
+

+ 68 - 0
content_indentify/worker.py

@@ -0,0 +1,68 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+单线程内容识别 Worker
+通过 Docker 的副本数实现并发,不再使用应用内多线程调度器。
+"""
+
+import os
+import sys
+import time
+import signal
+from typing import Optional
+
+# 确保可以导入到上级目录的公共模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.logging_config import get_logger
+from content_indentify.indentify import ContentIdentifier
+
+
+class ContentWorker:
+    def __init__(self,
+                 interval_seconds: int = 120,
+                 idle_sleep_seconds: int = 10):
+        self.interval_seconds = interval_seconds
+        self.idle_sleep_seconds = idle_sleep_seconds
+        self.running = True
+        self.logger = get_logger('ContentWorker')
+        self.identifier = ContentIdentifier()
+
+        signal.signal(signal.SIGINT, self._handle_signal)
+        signal.signal(signal.SIGTERM, self._handle_signal)
+
+    def _handle_signal(self, signum, frame):
+        self.logger.info(f"收到信号 {signum},准备优雅退出…")
+        self.running = False
+
+    def run_forever(self):
+        self.logger.info(f"启动 ContentWorker,间隔: {self.interval_seconds}s,空闲等待: {self.idle_sleep_seconds}s")
+        while self.running:
+            start_time = time.time()
+            try:
+                processed = self.identifier.process_single_record()
+                if not processed:
+                    # 没有可处理的数据,短暂休眠后继续尝试
+                    time.sleep(self.idle_sleep_seconds)
+                else:
+                    # 成功处理后,按设定间隔节流
+                    elapsed = time.time() - start_time
+                    wait_time = max(0, self.interval_seconds - elapsed)
+                    if wait_time > 0:
+                        time.sleep(wait_time)
+            except Exception as exc:
+                self.logger.error(f"处理异常: {exc}", exc_info=True)
+                time.sleep(self.idle_sleep_seconds)
+
+
+def main():
+    interval_seconds = int(os.getenv('INTERVAL_SECONDS', '120'))
+    idle_sleep_seconds = int(os.getenv('IDLE_SLEEP_SECONDS', '10'))
+    worker = ContentWorker(interval_seconds=interval_seconds,
+                           idle_sleep_seconds=idle_sleep_seconds)
+    worker.run_forever()
+
+
+if __name__ == '__main__':
+    main()
+

+ 48 - 0
docker-compose.yml

@@ -0,0 +1,48 @@
+services:
+  content_scheduler:
+    build: .
+    image: knowledge-scheduler:latest
+    working_dir: /app/content_indentify
+    command: python3 worker.py
+    environment:
+      - TZ=${TZ:-Asia/Shanghai}
+      - PYTHONPATH=/app
+      - GEMINI_API_KEY=${GEMINI_API_KEY}
+      - FEISHU_APP_ID=${FEISHU_APP_ID}
+      - FEISHU_APP_SECRET=${FEISHU_APP_SECRET}
+      - FEISHU_FILE_TOKEN=${FEISHU_FILE_TOKEN}
+      - HTTP_PROXY=${HTTP_PROXY}
+      - HTTPS_PROXY=${HTTPS_PROXY}
+      - NO_PROXY=${NO_PROXY}
+    volumes:
+      - ./:/app
+    restart: always
+    # 仅在 Docker Swarm 模式下生效;在本地 docker compose 可通过 --scale 使用
+    deploy:
+      replicas: 6
+
+  # structure_scheduler:
+  #   image: knowledge-scheduler:latest
+  #   container_name: structure-scheduler
+  #   working_dir: /app/structure
+  #   command: python3 multi_thread_scheduler.py
+  #   environment:
+  #     - TZ=${TZ:-Asia/Shanghai}
+  #     - PYTHONPATH=/app
+  #     - GEMINI_API_KEY=${GEMINI_API_KEY}
+  #     - FEISHU_APP_ID=${FEISHU_APP_ID}
+  #     - FEISHU_APP_SECRET=${FEISHU_APP_SECRET}
+  #     - FEISHU_FILE_TOKEN=${FEISHU_FILE_TOKEN}
+  #     - HTTP_PROXY=${HTTP_PROXY}
+  #     - HTTPS_PROXY=${HTTPS_PROXY}
+  #     - NO_PROXY=${NO_PROXY}
+  #   volumes:
+  #     - ./:/app
+  #   restart: always
+  #   depends_on:
+  #     - content_scheduler
+
+networks:
+  default:
+    name: knowledge-net
+

+ 64 - 0
structure/worker.py

@@ -0,0 +1,64 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+单线程结构化处理 Worker
+通过 Docker 的副本数实现并发,不再使用应用内多线程调度器。
+"""
+
+import os
+import sys
+import time
+import signal
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.logging_config import get_logger
+from structure.structure_processor import StructureProcessor
+
+
+class StructureWorker:
+    def __init__(self,
+                 interval_seconds: int = 120,
+                 idle_sleep_seconds: int = 10):
+        self.interval_seconds = interval_seconds
+        self.idle_sleep_seconds = idle_sleep_seconds
+        self.running = True
+        self.logger = get_logger('StructureWorker')
+        self.processor = StructureProcessor()
+
+        signal.signal(signal.SIGINT, self._handle_signal)
+        signal.signal(signal.SIGTERM, self._handle_signal)
+
+    def _handle_signal(self, signum, frame):
+        self.logger.info(f"收到信号 {signum},准备优雅退出…")
+        self.running = False
+
+    def run_forever(self):
+        self.logger.info(f"启动 StructureWorker,间隔: {self.interval_seconds}s,空闲等待: {self.idle_sleep_seconds}s")
+        while self.running:
+            start_time = time.time()
+            try:
+                processed = self.processor.process_single_record()
+                if not processed:
+                    time.sleep(self.idle_sleep_seconds)
+                else:
+                    elapsed = time.time() - start_time
+                    wait_time = max(0, self.interval_seconds - elapsed)
+                    if wait_time > 0:
+                        time.sleep(wait_time)
+            except Exception as exc:
+                self.logger.error(f"处理异常: {exc}", exc_info=True)
+                time.sleep(self.idle_sleep_seconds)
+
+
+def main():
+    interval_seconds = int(os.getenv('INTERVAL_SECONDS', '120'))
+    idle_sleep_seconds = int(os.getenv('IDLE_SLEEP_SECONDS', '10'))
+    worker = StructureWorker(interval_seconds=interval_seconds,
+                             idle_sleep_seconds=idle_sleep_seconds)
+    worker.run_forever()
+
+
+if __name__ == '__main__':
+    main()
+