zhangliang 7 годин тому
батько
коміт
85590ce087

+ 8 - 1
.env

@@ -28,4 +28,11 @@ ALIYUN_ACCESS_KEY_SECRET="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
 
 # 飞书配置
 FEISHU_APPID="cli_a13ad2afa438d00b"
-FEISHU_APPSECRET="4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"
+FEISHU_APPSECRET="4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"
+
+# Redis配置
+REDIS_HOST="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com"
+REDIS_PORT=6379
+REDIS_PASSWORD="Wqsd@2019"
+REDIS_DB=0
+REDIS_MAX_CONNECTIONS=50

+ 1 - 0
README.md

@@ -144,3 +144,4 @@ benshanzhufu:
 - 每条消息创建一个 UniversalCrawler 实例,执行 `.run()`,完成后再 ACK
 - 失败或超时不会阻塞其他任务
 
+pip freeze > requirements.txt

+ 11 - 0
config/base.py

@@ -48,6 +48,17 @@ class Settings(BaseSettings):
     ALIYUN_ACCESS_KEY_ID: str = Field(..., env="ALIYUN_ACCESS_KEY_ID")
     ALIYUN_ACCESS_KEY_SECRET: str = Field(..., env="ALIYUN_ACCESS_KEY_SECRET")
 
+    # redis
+    REDIS_HOST: str = Field(..., env="REDIS_HOST")
+    REDIS_PORT: int = Field(..., env="REDIS_PORT")
+    REDIS_PASSWORD: str = Field(..., env="REDIS_PASSWORD")
+    REDIS_DB: int = Field(0, env="REDIS_DB")
+    REDIS_MAX_CONNECTIONS: int = Field(20, env="REDIS_MAX_CONNECTIONS")
+    @property
+    def redis_url(self) -> str:
+        """生成"""
+        return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
+
     class Config:
         env_file = ".env"
         env_file_encoding = 'utf-8'

+ 46 - 0
core/base/async_redis_client.py

@@ -0,0 +1,46 @@
+
+import redis.asyncio as aioredis
+from core.utils.log.logger_manager import LoggerManager
+class RedisManager:
+    _pool = None
+    logger = LoggerManager.get_logger()
+
+    @classmethod
+    async def init(
+        cls,
+        redis_url: str = "",
+        max_connections: int = 20,
+        encoding: str = "utf-8",
+        decode_responses: bool = True
+    ):
+        """
+        初始化 Redis 异步连接池,保证进程级单例。
+        """
+        if cls._pool is None:
+            try:
+                cls._pool = await aioredis.from_url(
+                    redis_url,
+                    max_connections=max_connections,
+                    encoding=encoding,
+                    decode_responses=decode_responses,
+                    retry_on_timeout=True,
+                )
+                cls.logger.debug(f"[RedisManager] Redis 连接池初始化成功: {redis_url}")
+            except Exception as e:
+                cls.logger.error(f"[RedisManager] Redis 连接池初始化失败: {e}")
+                raise
+
+    @classmethod
+    def get_pool(cls):
+        if cls._pool is None:
+            raise Exception("[RedisManager] 未初始化,请先执行 RedisManager.init()")
+        return cls._pool
+
+    @classmethod
+    async def close(cls):
+        """
+        关闭连接池(在优雅退出时调用)
+        """
+        if cls._pool:
+            await cls._pool.close()
+            cls.logger.debug("[RedisManager] Redis 连接池已关闭")

+ 7 - 6
core/base/async_request_client.py

@@ -1,7 +1,5 @@
 import asyncio
 from typing import Optional, Dict
-from pprint import pformat
-
 import aiohttp
 
 from core.utils.log.logger_manager import LoggerManager
@@ -47,6 +45,9 @@ class AsyncRequestClient:
                             self.logger.info(f"响应 {resp}, 重试 {retries}/{self.max_retries}")
                         await asyncio.sleep(5)
                         continue
+                    self.logger.info(f"响应: {resp}")
+
+
                     return resp
             except Exception as e:
                 retries += 1
@@ -57,10 +58,10 @@ class AsyncRequestClient:
                         self.aliyun_log.logging(
                             code="9006",
                             message=f"请求异常达到最大重试次数",
-                            data={"utl": url,
-                                      "method": method,
-                                      "requestBody": kwargs,
-                                      "response": resp
+                            data={"url": url,
+                                  "method": method,
+                                  "requestBody": kwargs,
+                                  "response": resp
                                 }
                         )
                     return

+ 4 - 1
core/utils/codes.py

@@ -24,6 +24,8 @@ CODES = {
     "1008": "规则匹配成功",
     "1009": "成功发送至ETL",
     "1010": "任务执行完成",
+    "1011": "视频数量达到当日最大值",
+    "1012": "实时入库量",
 
     # 配置错误类 (4xxx)
     "4000": "爬虫配置缺失",
@@ -37,7 +39,7 @@ CODES = {
     "9003": "视频处理失败",
     "9004": "推送ETL失败",
     "9005": "爬虫致命错误退出",
-    "9006": "请求异常重试",
+    "9006": "请求异常达到最大重试",
     "9007": "字段缺失校验失败",
     "9008": "标题不符合规则",
     "9009": "发布时间不符合规则",
@@ -52,6 +54,7 @@ CODES = {
     "9018": "规则解析失败",
     "9019": "任务参数缺失",
     "9020": "过滤条件不匹配",
+    "9021": "接口返回数据为空",
 
     # 系统致命错误类 (99xx)
     "9900": "数据库连接失败",

+ 2 - 1
core/utils/extractors.py

@@ -44,7 +44,8 @@ def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None,aliy
             logger.warning(f"字段提取失败: {field} 路径: {path}")
             aliyun_log.logging(
                 code="9024",
-                message=f"字段提取失败: {field} 路径: {path}"
+                message=f"字段提取失败: {field} 路径: {path}",
+                data={"video": video}
 
             )
         result[field] = value

+ 73 - 53
core/utils/log/local_log.py

@@ -6,12 +6,39 @@ from loguru import logger as global_logger
 from config import settings
 from core.utils.trace_utils import get_current_trace_id
 
+
 class Local:
     """
-    本地日志记录器(支持每天自动生成新文件 trace_id 注入)
+    本地日志记录器
+    - 支持文件和控制台共享配置
+    - 自动管理日志处理器生命周期
+    - 确保日志格式一致性
     """
 
-    _initialized = set()  # 用于防止同一个 platform+mode 重复初始化
+    _initialized_loggers = {}  # 存储已初始化的日志器配置
+
+    @staticmethod
+    def _get_base_config(platform: str, mode: str) -> dict:
+        """获取基础日志配置(文件和控制台共享)"""
+        return {
+            "format": (
+                "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
+                "<level>{level}</level> | "
+                "<cyan>{extra[platform]}</cyan> | "
+                "<cyan>{extra[mode]}</cyan> | "
+                "<magenta>trace_id={extra[trace_id]}</magenta> | "
+                "<cyan>{name}:{function}:{line}</cyan> | "
+                "{message}"
+            ),
+            "backtrace": True, # 发生异常时显示完整堆栈回溯
+            "diagnose": True, # 发生异常时显示变量值
+            "enqueue": True, # 使用队列异步处理日志,提高性能
+            "level": "INFO",  # 默认级别,将被实际配置覆盖
+
+            # 日志过滤设置
+            "filter": lambda record: record["extra"].get("platform") == platform
+                                     and record["extra"].get("mode") == mode
+        }
 
     @staticmethod
     def init_logger(
@@ -19,68 +46,61 @@ class Local:
             mode: str,
             log_level: str = "INFO",
             log_to_console: bool = False,
-            retention: str = "10 days"
+            retention: str = "10 days",
+            rotation: str = "00:00",
+            compression: str = "gz",
     ):
-        """
-        初始化并返回带上下文的 loguru 日志实例
-
-        :param platform: 平台名(用于日志区分和文件夹名)
-        :param mode: 模式名(用于日志区分)
-        :param log_level: 日志级别(默认 INFO)
-        :param log_to_console: 是否同时输出到控制台
-        :param retention: 日志保留时间(默认 10 天自动清理)
-        :return: 带上下文的 loguru logger
-        """
-
+        """初始化并返回带上下文的日志实例"""
         key = f"{platform}_{mode}"
-        if key in Local._initialized:
-            # 已初始化相同 platform+mode 时直接返回全局 logger(防止重复初始化导致重复写日志)
-            return global_logger
+
+        # 如果已初始化,直接返回
+        if key in Local._initialized_loggers:
+            return global_logger.bind(
+                platform=platform,
+                mode=mode,
+                trace_id=get_current_trace_id()
+            )
 
         # 创建日志存放路径
         log_dir = Path(f"{settings.LOG_DIR}/{platform}")
         log_dir.mkdir(parents=True, exist_ok=True)
 
-        # 按日期命名的日志文件模式,loguru 会每天自动生成新的文件:
-        # 如 yuannifuqimanman-recommend-2025-07-04.log
+        # 日志文件模式
         log_pattern = log_dir / f"{platform}-{mode}-{{time:YYYY-MM-DD}}.log"
 
-        # 添加文件日志写入
-        global_logger.add(
-            str(log_pattern),
-            level=log_level.upper(),
-            retention=retention,
-            enqueue=True,         # 启用队列异步写入,提高性能
-            encoding="utf-8",     # 避免中文乱码
-            backtrace=True,       # 异常时输出完整堆栈
-            diagnose=True         # 异常时显示变量值
-        )
+        # 获取基础配置
+        base_config = Local._get_base_config(platform, mode)
+        base_config["level"] = log_level.upper()
 
-        if log_to_console:
-            # 添加控制台输出(可选)
-            global_logger.add(
-                sys.stdout,
-                level=log_level.upper(),
-                format=(
-                    "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
-                    "<level>{level}</level> | "
-                    "<cyan>{extra[platform]}</cyan> | "
-                    "<cyan>{extra[mode]}</cyan> | "
-                    "<magenta>trace_id={extra[trace_id]}</magenta> | "
-                    "{message}"
-                )
-            )
+        # 清理现有处理器(避免重复)
+        global_logger.remove()
 
-        # 获取当前 trace_id
-        current_trace_id = get_current_trace_id()
+        # 添加文件日志处理器(继承基础配置)
+        file_config = {
+            **base_config,
+            "sink": str(log_pattern),
+            "rotation": rotation,
+            "retention": retention,
+            "compression": compression,
+            "encoding": "utf-8"
+        }
+        global_logger.add(**file_config)
 
-        # 绑定上下文,便于日志区分来源:
-        # trace_id: 当前请求/任务链路唯一标识
-        logger_with_context = global_logger.bind(
-            trace_id=current_trace_id
-        )
+        # 添加控制台日志处理器(如果需要,继承基础配置并添加颜色)
+        if log_to_console:
+            console_config = {
+                **base_config,
+                "sink": sys.stdout,
+                "colorize": True
+            }
+            global_logger.add(**console_config)
 
-        # 防止重复初始化标记
-        Local._initialized.add(key)
+        # 标记为已初始化
+        Local._initialized_loggers[key] = True
 
-        return logger_with_context
+        # 返回绑定上下文的日志器
+        return global_logger.bind(
+            platform=platform,
+            mode=mode,
+            trace_id=get_current_trace_id()
+        )

+ 1 - 1
core/utils/log/logger_manager.py

@@ -17,7 +17,7 @@ class LoggerManager:
     def get_logger(
         platform: str="system",
         mode: str="crawler",
-        log_to_console: bool = False
+        log_to_console: bool = True
     ) -> LoguruLogger:
         key = f"{platform}_{mode}"
         if key not in LoggerManager._local_loggers:

+ 78 - 0
core/utils/request_preparer.py

@@ -0,0 +1,78 @@
+import loguru
+
+from core.utils.extractors import safe_extract
+from typing import Dict, Any
+
+
+
+
+class RequestPreparer:
+    """
+    动态准备请求体:
+    - 支持 {{var_name}} 动态占位符
+    - 自动从上次响应根据 response_parse 配置提取对应值
+    - 无值时使用空字符串
+    - 可选日志记录提取失败情况
+    """
+
+    def __init__(self, response_parse_config: Dict[str, str], logger=None, aliyun_log=None):
+        """
+        :param response_parse_config: 如 {"next_cursor": "$.data.next_cursor"}
+        :param logger: 可选 logger
+        :param aliyun_log: 可选阿里云日志实例
+        """
+        self.response_parse_config = response_parse_config
+        self.logger = logger or loguru.logger
+        self.aliyun_log = aliyun_log
+
+    def prepare(self, request_body_config: Dict[str, Any], response_data: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        根据 request_body_config 和上次响应 response_data,返回可直接请求接口的 request_body
+        """
+        prepared_body = {}
+        for key, value in request_body_config.items():
+            if isinstance(value, str) and "{{" in value and "}}" in value:
+                var_name = value.strip("{}").split("|")[0]  # 支持后续扩展默认值
+                jsonpath_expr = self.response_parse_config.get(var_name)
+                if jsonpath_expr:
+                    extracted_value = safe_extract(response_data, jsonpath_expr, default="")
+                    prepared_body[key] = extracted_value
+                else:
+                    # response_parse_config 中未配置路径,默认空字符串
+                    prepared_body[key] = ""
+            else:
+                prepared_body[key] = value
+        return prepared_body
+
+
+if __name__ == "__main__":
+    # ===== 示例运行观察效果 =====
+    request_body_config = {
+        "cursor": "{{next_cursor}}",
+        "category": "recommend",
+        "flag": "{{flag}}"
+    }
+
+    response_parse_config = {
+        "next_cursor": "$.data.next_cursor",
+        "flag": "$.data.flag"
+    }
+
+    # 模拟响应
+    response_data = {
+        "data": {
+            "next_cursor": "abc123",
+            "flag": "on"
+        }
+    }
+
+    preparer = RequestPreparer(response_parse_config)
+    prepared_body = preparer.prepare(request_body_config, response_data)
+
+    print("准备好的请求体:", prepared_body)
+    # 输出: {'cursor': 'abc123', 'category': 'recommend', 'flag': 'on'}
+
+    # 测试首次请求情况
+    prepared_body_first = preparer.prepare(request_body_config, {})
+    print("首次请求时请求体:", prepared_body_first)
+    # 输出: {'cursor': '', 'category': 'recommend', 'flag': ''}

+ 0 - 62
core/utils/template_resolver.py

@@ -1,62 +0,0 @@
-# core/utils/template_resolver.py
-
-import re
-from typing import Any, Dict
-
-PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
-
-def resolve_request_body_template(
-    data: Any,
-    variables: Dict[str, Any] = {},
-) -> Any:
-    """
-    仅解析请求参数中 {{var}} 模板
-
-    Args:
-        data: dict/list/str
-        variables: {"next_cursor": "123", ...}
-
-    Returns:
-        替换后的结构
-        :rtype: Any
-    """
-    if isinstance(data, dict):
-        return {k: resolve_request_body_template(v, variables) for k, v in data.items()}
-    elif isinstance(data, list):
-        return [resolve_request_body_template(item, variables) for item in data]
-    elif isinstance(data, str):
-        def replacer(match):
-            var_name = match.group(1)
-            default_value = match.group(2)[1:] if match.group(2) else None
-            value = variables.get(var_name)
-            if value is not None:
-                return str(value)
-            elif default_value is not None:
-                return default_value
-            else:
-                return ""
-        return PLACEHOLDER_PATTERN.sub(replacer, data)
-    else:
-        return data
-
-if __name__ == '__main__':
-    data = {
-        "cursor": "{{next_cursor}}",
-        "page_size": 20,
-        "filters": {
-            "start_date": "{{start_date}}",
-            "end_date": "{{end_date|2025-01-01}}"
-        },
-        "tags": ["{{tag1}}", "{{tag2|default_tag}}"]
-    }
-
-    variables = {
-
-        # "dada":{"next_cursor":"1"},
-        # "start_date": "2025-06-30",
-        # "tag1": "news",
-        # "tag3": "news",
-        # "default_tag55": "1111",
-    }
-    result = resolve_request_body_template(data, variables)
-    print(result)

+ 1 - 1
deploy.sh

@@ -4,7 +4,7 @@ set -e  # 出错时终止脚本
 # 配置信息
 APP_DIR="/root/AutoScraperX"
 LOG_FILE="/var/log/autoscraperx_deploy.log"
-VENV_DIR="${APP_DIR}/venv"
+VENV_DIR="${APP_DIR}/.venv"
 PYTHON="python"
 REQUIREMENTS="${APP_DIR}/requirements.txt"
 

+ 11 - 0
requirements.txt

@@ -4,11 +4,14 @@ aiosignal==1.3.2
 aliyun-log-python-sdk==0.9.24
 aliyun-python-sdk-core==2.13.36
 annotated-types==0.7.0
+async-timeout==5.0.1
 asyncmy==0.2.10
 attrs==25.3.0
+build==1.2.2.post1
 certifi==2025.6.15
 cffi==1.17.1
 charset-normalizer==3.4.2
+click==8.2.1
 coverage==7.9.1
 cryptography==45.0.4
 dateparser==1.2.1
@@ -21,6 +24,7 @@ grpcio==1.73.1
 grpcio-tools==1.71.2
 idna==3.10
 importlib_metadata==8.7.0
+iniconfig==2.1.0
 jmespath==0.10.0
 jsonpath-ng==1.7.0
 loguru==0.7.3
@@ -35,6 +39,9 @@ opentelemetry-exporter-otlp-proto-http==1.34.1
 opentelemetry-proto==1.34.1
 opentelemetry-sdk==1.34.1
 opentelemetry-semantic-conventions==0.55b1
+packaging==25.0
+pip-tools==7.4.1
+pluggy==1.6.0
 ply==3.11
 propcache==0.3.2
 protobuf==5.27.0
@@ -42,7 +49,11 @@ pycparser==2.22
 pydantic==1.10.22
 pydantic-env-settings==0.1.1
 pydantic_core==2.33.2
+Pygments==2.19.2
 PyMySQL==1.1.1
+pyproject_hooks==1.2.0
+pytest==8.4.1
+pytest-asyncio==1.0.0
 python-dateutil==2.9.0.post0
 python-dotenv==1.1.0
 pytz==2025.2

+ 65 - 56
scheduler/async_consumer.py

@@ -4,6 +4,8 @@ import traceback
 from typing import List
 import signal
 
+from config import settings
+from core.base.async_redis_client import RedisManager
 from core.utils.log.logger_manager import LoggerManager
 from core.utils.trace_utils import generate_trace_id, TraceContext
 from services.async_mysql_service import AsyncMysqlService
@@ -22,65 +24,69 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
     - 记录日志、确认消息。
     """
     # 每个 topic 创建独立的 consumer 实例(使用优化后的 AsyncRocketMQConsumer)
+
     from services.async_mq_consumer import AsyncRocketMQConsumer
+    from services.async_redis_service import AsyncRedisService
+    redis_controller = AsyncRedisService()
     consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
 
     async def handle_single_message(message):
         """处理单条消息的业务逻辑(不含拉取和循环)"""
-
-        with TraceContext() as trace_id:  # 生成 trace_id 并绑定到上下文
-            try:
-                payload = json.loads(message.message_body)
-                task_id = payload["id"]
-
-                logger.info(f"[{topic}]接收到任务消息: {task_id}")
-                # 确认消息(单条消息处理成功后才 Ack)
-                await consumer.ack_message(message.receipt_handle)
-                logger.info(f"[{topic}]任务 {task_id} 已 Ack")
-
-                aliyun_logger.logging(
-                    code="1000",
-                    message="任务接收成功",
-                    data=payload,
-                    account=topic
-                )
-
-                # 从数据库查询配置
-                async with AsyncMysqlService() as mysql:
-                    user_list = await mysql.get_user_list(task_id)
-                    rule_dict = await mysql.get_rule_dict(task_id)
-
-                # 执行爬虫任务
-                CrawlerClass = get_spider_class(topic)
-                crawler = CrawlerClass(
-                    rule_dict=rule_dict,
-                    user_list=user_list,
-                )
-                await crawler.run()  # 爬虫成功执行后再确认消息
-
-
-
-                logger.info(f"[{topic}]任务 {task_id} 执行成功")
-                aliyun_logger.logging(
-                    code="1010",
-                    message="任务执行成功",
-                    data={"task_id": task_id, "topic": topic},
-                    account=topic
-                )
-
-            except Exception as e:
-                logger.error(f"[{topic}]任务处理失败: {e} \n {traceback.format_exc()}")
-                aliyun_logger.logging(
-                    code="9001",
-                    message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
-                    data={
-                        "error_type": type(e).__name__,
-                        "stack_trace": traceback.format_exc(),
-                        "message_body": message.message_body
-                    },
-                    account=topic
-                )
-                # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
+        message_id = message.message_id
+        status = await redis_controller.get_status(message_id)
+        if status == "1":
+            logger.info(f"[{topic}] MessageID: {message_id} 已执行完成,直接 Ack 跳过")
+            await consumer.ack_message(message.receipt_handle)
+            return
+        if not status:
+            logger.info(f"[{topic}] MessageID: {message_id} 未执行,标记为处理中")
+            await redis_controller.mark_processing(message_id)
+
+        try:
+            payload = json.loads(message.message_body)
+            task_id = payload["task_id"]
+            logger.info(f"[{topic}]接收到任务消息: {task_id}")
+
+            aliyun_logger.logging(
+                code="1000",
+                message="任务接收成功",
+                data=payload,
+                account=topic
+            )
+            # 从数据库查询配置
+            async with AsyncMysqlService() as mysql:
+                user_list = await mysql.get_user_list(task_id)
+                rule_dict = await mysql.get_rule_dict(task_id)
+
+            # 执行爬虫任务
+            CrawlerClass = get_spider_class(topic)
+            crawler = CrawlerClass(
+                rule_dict=rule_dict,
+                user_list=user_list,
+            )
+            await crawler.run()  # 爬虫成功执行后再确认消息
+            logger.info(f"[{topic}]任务 {task_id} 执行成功")
+            aliyun_logger.logging(
+                code="1010",
+                message="任务执行成功",
+                data={"task_id": task_id, "topic": topic},
+                account=topic
+            )
+            await redis_controller.mark_done(message_id)
+
+        except Exception as e:
+            logger.error(f"[{topic}]任务处理失败 MessageID: {message_id}: {e} \n {traceback.format_exc()}")
+            aliyun_logger.logging(
+                code="9001",
+                message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
+                data={
+                    "error_type": type(e).__name__,
+                    "stack_trace": traceback.format_exc(),
+                    "message_body": message.message_body
+                },
+                account=topic
+            )
+            # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
 
     # 独立的消费循环:拉取消息并调用处理函数
     async def consume_loop():
@@ -90,8 +96,9 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                 # 拉取单条消息(依赖优化后的 receive_message,无消息时返回 None 不报错)
                 message = await consumer.receive_message()
                 if message:
-                    # 有消息则处理,处理完成后再进入下一次循环
-                    await handle_single_message(message)
+                    with TraceContext() as trace_id:  # 生成 trace_id 并绑定到上下文
+                        # 有消息则处理,处理完成后再进入下一次循环
+                        await handle_single_message(message)
                 else:
                     # 无消息时短暂休眠,避免频繁空轮询
                     await asyncio.sleep(1)
@@ -113,6 +120,7 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
 
 
 async def run_all_topics(topics: List[str]):
+    await RedisManager.init(redis_url=settings.redis_url)
     stop_event = asyncio.Event()
     loop = asyncio.get_running_loop()
 
@@ -124,6 +132,7 @@ async def run_all_topics(topics: List[str]):
             message="[系统] 收到停止信号,准备优雅退出...",
         )
         stop_event.set()
+        asyncio.create_task(RedisManager.close())  # 关闭连接池
 
     # 注册信号处理(支持 Ctrl+C 和 kill 命令)
     for sig in [signal.SIGINT, signal.SIGTERM]:

+ 49 - 0
services/async_redis_service.py

@@ -0,0 +1,49 @@
+
+from core.base.async_redis_client import RedisManager
+
+class AsyncRedisService:
+    """
+    RocketMQ控制组件:
+    - 拉取到消息后先检测是否执行过,未执行过则标记为处理中。
+    - 执行完成后标记已完成。
+    - 避免同一 message_id 重复执行长任务。
+    """
+
+    def __init__(self, prefix: str = "crawler:task", ttl: int = 1 * 24 * 3600):
+        """
+        :param prefix: Redis key 前缀
+        :param ttl: Key 存活时间(秒),默认 7 天防堆积
+        """
+        self.prefix = prefix
+        self.ttl = ttl
+
+
+    def _build_key(self, message_id: str) -> str:
+        return f"{self.prefix}:{message_id}"
+
+    async def get_status(self, message_id: str) -> str:
+        """
+        获取当前执行状态:
+        - "0":执行中
+        - "1":已完成
+        - None:未执行
+        """
+        key = self._build_key(message_id)
+        pool = RedisManager.get_pool()
+        return await pool.get(key)
+
+    async def mark_processing(self, message_id: str):
+        """
+        标记当前 message_id 为执行中(值为 "0")
+        """
+        key = self._build_key(message_id)
+        pool = RedisManager.get_pool()
+        await pool.set(key, "0", ex=self.ttl)
+
+    async def mark_done(self, message_id: str):
+        """
+        标记当前 message_id 已执行完成(值为 "1")
+        """
+        key = self._build_key(message_id)
+        pool = RedisManager.get_pool()
+        await pool.set(key, "1", ex=self.ttl)

+ 113 - 0
services/rocketmq_consumer_wrapper.py

@@ -0,0 +1,113 @@
+# 文件: core/mq/rocketmq_consumer_wrapper.py (新文件)
+
+import asyncio
+import logging
+from typing import Callable, Awaitable
+from rocketmq.client import ClientConfiguration, Credentials, SimpleConsumer
+from rocketmq.model.message import Message
+
+from config import settings
+from core.utils.log.logger_manager import LoggerManager
+
+
+class RocketMQSimpleConsumerWrapper:
+    """
+    rocketmq-client-python 的异步封装。
+    - 负责连接、订阅和优雅关闭。
+    - 提供一个异步运行的消费循环,该循环:
+      1. 以异步方式拉取消息。
+      2. 异步处理函数。
+      3. 在处理函数成功完成后,自动异步确认消息。
+      4. 如果处理函数失败,则不确认,等待消息重试。
+    """
+
+    def __init__(self, group_id: str, topic: str):
+        self.logger = LoggerManager.get_logger()
+        self.group_id = group_id
+        self.topic = topic
+
+        # 从配置中读取连接信息
+        credentials = Credentials(settings.ROCKETMQ_ACCESS_KEY_ID, settings.ROCKETMQ_ACCESS_KEY_SECRET)
+        config = ClientConfiguration(
+            settings.ROCKETMQ_ENDPOINT,
+            credentials,
+            settings.ROCKETMQ_INSTANCE_ID  # 官方SDK的namespace参数用于实例ID
+        )
+        # 阿里云商业版 topic 需要带上实例ID前缀
+        self.topic_with_instance = f"{settings.ROCKETMQ_INSTANCE_ID}%%{self.topic}"
+
+        # wait_seconds 对应 receive 的 long-polling 时间
+        self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS
+
+        # 消息不可见时长,先设置30分钟
+        self.invisible_duration_seconds = 1800
+
+        self._consumer = SimpleConsumer(config, self.group_id)
+        self._is_running = False
+
+    async def startup(self):
+        """启动消费者"""
+        if self._is_running:
+            return
+        self.logger.info(f"[{self.topic}] 消费者正在启动...")
+        await asyncio.to_thread(self._consumer.startup)
+        self._is_running = True
+        self.logger.info(f"[{self.topic}] 消费者启动成功")
+
+    async def shutdown(self):
+        """关闭消费者"""
+        if not self._is_running:
+            return
+        self._is_running = False
+        self.logger.info(f"[{self.topic}] 消费者正在关闭...")
+        await asyncio.to_thread(self._consumer.shutdown)
+        self.logger.info(f"[{self.topic}] 消费者已关闭")
+
+    async def subscribe(self):
+        """订阅主题"""
+        self.logger.info(f"[{self.topic}] 正在订阅主题: {self.topic_with_instance}")
+        await asyncio.to_thread(self._consumer.subscribe, self.topic_with_instance)
+        self.logger.info(f"[{self.topic}] 订阅成功")
+
+    async def run(self, async_message_handler: Callable[[Message], Awaitable[None]], stop_event: asyncio.Event):
+        """
+        运行主消费循环。
+        :param async_message_handler: 用户定义的异步消息处理函数。
+        :param stop_event: 用于优雅停止的 asyncio 事件。
+        """
+        self.logger.info(f"[{self.topic}] 进入消费循环,消息不可见时间设置为 {self.invisible_duration_seconds} 秒")
+        while not stop_event.is_set():
+            try:
+                # 使用 to_thread 在异步环境中运行阻塞的 receive 方法
+                messages = await asyncio.to_thread(self._consumer.receive, 1, self.invisible_duration_seconds)
+
+                if not messages:
+                    self.logger.debug(f"[{self.topic}] 长轮询超时,未拉取到消息,继续...")
+                    continue
+
+                message = messages[0]
+                self.logger.info(f"[{self.topic}] 拉取到消息 ID: {message.message_id}")
+
+                try:
+                    # 调用业务逻辑处理器
+                    await async_message_handler(message)
+
+                    # 业务逻辑成功后,异步确认消息 <--- 正确的ACK位置
+                    self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} 处理成功,正在ACK...")
+                    await asyncio.to_thread(self._consumer.ack, message)
+                    self.logger.info(f"[{self.topic}] 消息 ID: {message.message_id} ACK成功")
+
+                except Exception as e:
+                    # 业务逻辑失败,打印日志,不ACK,消息将超时后重投
+                    self.logger.error(
+                        f"[{self.topic}] 处理消息 ID: {message.message_id} 失败,将不进行ACK,等待重投。错误: {e}",
+                        exc_info=True
+                    )
+
+            except Exception as e:
+                # 拉取消息本身失败(如网络中断)
+                self.logger.error(f"[{self.topic}] 消费循环发生严重错误: {e}", exc_info=True)
+                # 等待一段时间避免频繁失败
+                await asyncio.sleep(5)
+
+        self.logger.info(f"[{self.topic}] 消费循环已停止 (收到退出信号)")

+ 71 - 82
spiders/base_spider.py

@@ -6,10 +6,10 @@ import aiohttp
 
 from core.models.video_item import VideoItem
 from core.utils.helpers import generate_titles
+from core.utils.request_preparer import RequestPreparer
 from core.utils.spider_config import SpiderConfig
 from core.utils.extractors import safe_extract, extract_fields
 from core.utils.log.logger_manager import LoggerManager
-from core.utils.template_resolver import resolve_request_body_template
 from services.async_mysql_service import AsyncMysqlService
 from services.pipeline import PiaoQuanPipeline
 from core.base.async_request_client import AsyncRequestClient
@@ -26,52 +26,67 @@ class BaseSpider:
     """
 
     def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
-        self.env = env
-        self.user_list = user_list
         self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
         self.class_name = self.__class__.__name__.lower()
 
-        # 通过小写子类名读取配置
-        self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
+        # --- 1. 初始化核心组件 ---
+        self._setup_configuration()
+        self._setup_logging()
+        self._setup_services()
+        self._setup_state()
 
+    #  初始化辅助方法
+    def _setup_configuration(self):
+        """加载并设置爬虫的核心配置。"""
+        self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
         if not self.platform_config:
-            raise ValueError(f"找不到对应配置: {self.class_name}")
+            raise ValueError(f"找不到爬虫配置: {self.class_name}")
 
         self.platform = self.platform_config.platform
         self.mode = self.platform_config.mode
-        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-
-        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
-
-        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
-        self.method = self.platform_config.method.upper()
         self.url = self.platform_config.url
+        self.method = self.platform_config.method.upper()
         self.headers = self.platform_config.headers or {}
-        self.request_body_template = self.platform_config.request_body or {}
 
-        self.response_parse = self.platform_config.response_parse or {}
-        self.next_cursor_path = self.response_parse.get("next_cursor")
-        self.data_path = self.response_parse.get("data_path")
-        self.field_map = self.response_parse.get("fields", {})
+        # 请求和解析相关的配置
+        self.request_body_template = self.platform_config.request_body or {}
+        self.response_parse_config = self.platform_config.response_parse or {}
+        self.data_path = self.response_parse_config.get("data_path")
+        # self.next_cursor_path = self.response_parse_config.get("next_cursor")
+        self.field_map = self.response_parse_config.get("fields", {})
 
+        # 爬取行为相关的配置
         self.loop_times = self.platform_config.loop_times or 100
         self.loop_interval = self.platform_config.loop_interval or 5
-        self.logger.info(f"开始{self.platform}爬取,最大循环次数{self.loop_times},循环间隔{self.loop_interval}s")
-        self.feishu_sheetid = self.platform_config.feishu_sheetid
-
-        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
-
         self.timeout = self.platform_config.request_timeout or 30
         self.max_retries = self.platform_config.max_retries or 3
+        self.feishu_sheetid = self.platform_config.feishu_sheetid
 
-        # 当前分页游标,默认空字符串,支持动态替换request_body中任何字段(如cursor)
-        self.dynamic_params = {key: "" for key in self.request_body_template.keys()}
-        # 允许子类重写,支持多游标等复杂情况
-        self.current_cursor = ""
+    def _setup_logging(self):
+        """初始化日志记录器。"""
+        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
+        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+        self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
+        self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔: {self.loop_interval}s")
+
+    def _setup_services(self):
+        """初始化外部服务客户端。"""
+        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
+        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
+        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform,
+                                           mode=self.mode)
+
+    def _setup_state(self):
+        """初始化爬虫的内部状态。"""
+        self.last_response_data = {}
+        self.request_preparer = RequestPreparer(
+            response_parse_config=self.response_parse_config,
+            logger=self.logger,
+            aliyun_log=self.aliyun_log
+        )
 
-        self.download_cnt = 0
-        self.limit_flag = False
 
 
     async def run(self):
@@ -81,18 +96,19 @@ class BaseSpider:
         async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
             for loop_index in range(self.loop_times):
                 if not await self.is_video_count_sufficient():
-                    self.logger.info(f"视频抓取数量已达上限,提前结束")
+                    self.logger.info(f"视频抓取数量已达上限,停止爬取")
                     return
                 succ, fail = await self.run_single_loop(session)
                 total_success += succ
                 total_fail += fail
                 await self._wait_for_next_loop(loop_index + 1)
 
-        self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
+            self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
 
     async def run_single_loop(self, session) -> (int, int):
         """
-        单轮请求与处理
+        执行单轮的请求、解析和处理。
+        返回: (本轮成功处理的数量, 本轮失败处理的数量)
         """
         success_count, fail_count = 0, 0
         try:
@@ -110,8 +126,7 @@ class BaseSpider:
                     success_count += 1
                 else:
                     fail_count += 1
-                if self.limit_flag:
-                    break
+            self.logger.info(f"接口返回<{len(videos)}>条视频,处理成功<{success_count}>条,处理失败:<{fail_count}>")
             await self.after_run()
 
         except Exception as e:
@@ -131,63 +146,42 @@ class BaseSpider:
         请求接口,自动渲染动态参数,自动更新游标
         支持单请求和多请求(分页)逻辑。
         """
-        # 动态渲染请求体
-        # resolved_body = self._render_request_body()
-
+        request_body = self.request_preparer.prepare(self.request_body_template,
+                                                     self.last_response_data)
         # 发送请求
         response = await self.request_client.request(
             session=session,
             method=self.method,
             url=self.url,
             headers=self.headers,
-            json= self.dynamic_params
+            json = request_body
         )
 
         if not response:
             self.logger.error(f"响应为空")
-            return []
-
-        # 更新游标(支持动态参数更新)
-        if self.next_cursor_path:
-            next_cursor = safe_extract(response, self.next_cursor_path) or ""
-            self._update_cursor(next_cursor)
+            return
 
+        self.last_response_data = response
         # 解析数据列表
         data_list = safe_extract(response, self.data_path)
         if not data_list:
-            self.logger.info(f"未获取到有效数据")
-            return []
+            self.logger.info(f"接口返回视频列表为空{response}")
+            self.aliyun_log.logging(
+                code="9021",
+                message="接口返回视频列表为空",
+                data= response
+            )
+            return
 
         return data_list
 
-    def _render_request_body(self) -> Dict:
-        """
-        用当前动态参数渲染请求体模板,支持多参数动态替换
-        """
-        body = {}
-        for k, v in self.request_body_template.items():
-            if isinstance(v, str) and v.startswith("{{") and v.endswith("}}"):
-                key = v.strip("{} ")
-                body[k] = self.dynamic_params.get(key, "")
-            else:
-                body[k] = v
-        return body
-
-    def _update_cursor(self, cursor_value: str):
-        """
-        更新分页游标并动态参数,方便下一次请求使用
-        """
-        self.current_cursor = cursor_value
-        # 如果配置的游标字段在请求体中,更新动态参数
-        if "cursor" in self.dynamic_params:
-            self.dynamic_params["cursor"] = cursor_value
-
     async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
         """
         数据处理完整流程(字段映射 -> 校验 -> 推送)
         子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
         """
         try:
+            # 字段映射
             video_obj = await self.process_video(video)
             if not video_obj:
                 return False
@@ -197,15 +191,6 @@ class BaseSpider:
 
             await self.integrated_video_handling(video_obj)
             pushed = await self.push_to_etl(video_obj)
-
-            # 达到下载上限,停止继续抓取
-            if self.rule_dict.get("videos_cnt", {}).get("min") and \
-                    self.download_cnt >= self.rule_dict["videos_cnt"]["min"]:
-                self.limit_flag = True
-
-            if pushed:
-                self.download_cnt += 1
-
             return pushed
         except Exception as e:
             self.logger.exception(f"视频处理异常: {e}")
@@ -217,19 +202,19 @@ class BaseSpider:
         子类可重写或扩展以定制字段映射、过滤等
         """
         self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}")
-        publish_user = None
         if self.user_list:
             import random
             publish_user = random.choice(self.user_list)
         else:
-            publish_user = {"uid": "default", "nick_name": "default_user"}
+            self.logger.error(f"未获取到用户列表数据{self.user_list}")
+            return
 
         item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log)
         item_kwargs.update({
             "user_id": publish_user.get("uid"),
             "user_name": publish_user.get("nick_name"),
             "platform": self.platform,
-            "strategy": self.mode
+            "strategy": self.mode,
         })
 
         try:
@@ -286,10 +271,14 @@ class BaseSpider:
         async with AsyncMysqlService(self.platform, self.mode) as mysql:
             current_count = await mysql.get_today_videos()
             if current_count >= max_count:
-                self.logger.info(f"{self.platform} 今日视频数量已达上限: {current_count}")
                 self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
                 return False
-        return True
+            self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
+            self.aliyun_log.logging(code="1012",
+                                    message=f"目前入库量{current_count}",
+                                    data=f"{current_count}/{max_count}"
+                                    )
+            return True
 
     async def _wait_for_next_loop(self, current_loop: int) -> None:
         """等待下次循环"""

+ 8 - 4
tests/test1.py

@@ -1,4 +1,8 @@
-topics = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
-num_groups = 4
-
-print([topics[i::num_groups] for i in range(num_groups)])
+# topics = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
+# num_groups = 4
+#
+# print([topics[i::num_groups] for i in range(num_groups)])
+v = "{{next_cursor}}"
+if isinstance(v, str) and v.startswith("{{") and v.endswith("}}"):
+    key = v.strip("{}")
+    print(key)

+ 33 - 0
tests/test_async_redis_service.py

@@ -0,0 +1,33 @@
+import pytest
+import asyncio
+
+from config import settings
+from services.async_redis_service import AsyncRedisService
+from core.base.async_redis_client import RedisManager
+
+@pytest.mark.asyncio
+async def test_async_redis_service_mark_and_get_status():
+    await RedisManager.init(redis_url=settings.redis_url)
+    service = AsyncRedisService(prefix="crawler:task", ttl=20)
+    test_message_id = "test_123456"
+
+    # 确保干净
+    pool = RedisManager.get_pool()
+    await pool.delete(service._build_key(test_message_id))
+
+    # 初始应获取 None
+    status = await service.get_status(test_message_id)
+    assert status is None
+
+    # 标记为执行中
+    await service.mark_processing(test_message_id)
+    status = await service.get_status(test_message_id)
+    assert status == "0"
+
+    # 标记为完成
+    await service.mark_done(test_message_id)
+    status = await service.get_status(test_message_id)
+    assert status == "1"
+
+    # 清理
+    # await pool.delete(service._build_key(test_message_id))

+ 1 - 1
tests/test_config.py

@@ -1,4 +1,4 @@
 from config import settings
 
 print("=== 配置验证 ===")
-print("DB连接:", settings.database_url)
+print("DB连接:", settings.REDIS_HOST)