瀏覽代碼

bug修复

zhangliang 1 天之前
父節點
當前提交
857ec54700

+ 1 - 0
config/spiders_config.yaml

@@ -1,6 +1,7 @@
 default:
   base_url: http://8.217.192.46:8889
   request_timeout: 30
+  max_retries: 3
   headers:
     {"Content-Type": "application/json"}
 

+ 8 - 18
core/base/async_mysql_client.py

@@ -1,12 +1,11 @@
 """
 AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
 
-- 使用 LoggerManager 管理本地及阿里云日志,便于定位问题
 - 自动管理连接池,避免重复初始化和内存泄漏
 - 支持 async with 上下文管理
 - 高并发协程安全
 """
-
+import traceback
 from typing import List, Dict, Any, Optional, Tuple
 import asyncio
 import asyncmy
@@ -43,7 +42,9 @@ class AsyncMySQLClient:
                  db: str,
                  charset: str,
                  minsize: int = 1,
-                 maxsize: int = 5):
+                 maxsize: int = 5,
+                 logger: Optional[LoggerManager.get_logger()] = None,
+                 aliyun_logr:Optional[LoggerManager.get_aliyun_logger()] = None,):
         """
         初始化配置,延迟创建连接池
         """
@@ -62,9 +63,8 @@ class AsyncMySQLClient:
         self._pool: Optional[asyncmy.Pool] = None
         self._lock = asyncio.Lock()  # 防止并发初始化
 
-        # 引入统一日志体系
-        self.logger = LoggerManager.get_logger(platform=db, mode="mysql")
-        self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=db, mode="mysql")
+        self.logger = logger
+        self.aliyun_logger = aliyun_logr
 
     async def __aenter__(self):
         """支持 async with 自动初始化连接池"""
@@ -90,13 +90,10 @@ class AsyncMySQLClient:
                     minsize=self._minsize,
                     maxsize=self._maxsize,
                 )
-                msg = f"[AsyncMySQLClient] 连接池初始化成功: {self._db_settings['host']}:{self._db_settings['db']}"
-                self.logger.info(msg)
-                self.aliyun_logger.logging(code="2000", message=msg)
             except Exception as e:
-                msg = f"[AsyncMySQLClient] 连接池初始化失败: {e}"
+                msg = f"[AsyncMySQLClient] 连接池初始化失败: {e} \n {traceback.format_exc()}"
                 self.logger.error(msg)
-                self.aliyun_logger.logging(code="9001", message=msg, data={"error": str(e)})
+                self.aliyun_logger.logging(code="9001", message=msg, data={f"error: f{traceback.format_exc()} \n {e}"})
                 raise
 
     async def close(self):
@@ -106,9 +103,6 @@ class AsyncMySQLClient:
         if self._pool:
             self._pool.close()
             await self._pool.wait_closed()
-            msg = "[AsyncMySQLClient] 连接池已关闭"
-            self.logger.info(msg)
-            self.aliyun_logger.logging(code="2001", message=msg)
             self._pool = None
 
     async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
@@ -123,7 +117,6 @@ class AsyncMySQLClient:
                     rows = await cur.fetchall()
                     columns = [desc[0] for desc in cur.description]
                     result = [dict(zip(columns, row)) for row in rows]
-                    self.logger.info(f"[AsyncMySQLClient] fetch_all 执行成功: {sql}")
                     return result
         except Exception as e:
             msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
@@ -145,7 +138,6 @@ class AsyncMySQLClient:
                         return None
                     columns = [desc[0] for desc in cur.description]
                     result = dict(zip(columns, row))
-                    self.logger.info(f"[AsyncMySQLClient] fetch_one 执行成功: {sql}")
                     return result
         except Exception as e:
             msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
@@ -162,7 +154,6 @@ class AsyncMySQLClient:
             async with self._pool.acquire() as conn:
                 async with conn.cursor() as cur:
                     await cur.execute(sql, params or [])
-                    self.logger.info(f"[AsyncMySQLClient] execute 执行成功: {sql}")
                     return cur.rowcount
         except Exception as e:
             msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
@@ -179,7 +170,6 @@ class AsyncMySQLClient:
             async with self._pool.acquire() as conn:
                 async with conn.cursor() as cur:
                     await cur.executemany(sql, params_list)
-                    self.logger.info(f"[AsyncMySQLClient] executemany 执行成功: {sql}")
                     return cur.rowcount
         except Exception as e:
             msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"

+ 26 - 7
core/base/async_request_client.py

@@ -4,12 +4,19 @@ from pprint import pformat
 
 import aiohttp
 
+from core.utils.log.logger_manager import LoggerManager
+
 
 class AsyncRequestClient:
     """
-    可独立复用的异步请求客户端,支持重试、日志结构化和限流预留
+    请求失败重试,本地日志记录
+    请求返回code!=0重试,本地日志记录
+    重试达到最大次数后上报阿里云日志
     """
-    def __init__(self, logger=None, aliyun_log=None,max_retries=3, timeout=30):
+    def __init__(self, logger:Optional[LoggerManager.get_logger()] = None ,
+                 aliyun_log:Optional[LoggerManager.get_aliyun_logger()] = None,
+                 max_retries=3, timeout=30
+                 ):
         self.logger = logger
         self.aliyun_log = aliyun_log
         self.max_retries = max_retries
@@ -22,16 +29,24 @@ class AsyncRequestClient:
             try:
                 if self.logger:
                     self.logger.info(f"请求 {method} {url}, 请求参数{kwargs}")
+                if self.aliyun_log:
+                    self.aliyun_log.logging(
+                        code = "1012",
+                        message="初始化请求",
+                        data={"utl":url,
+                              "method":method,
+                              "requestBody":kwargs
+                              }
+                    )
                 async with session.request(method, url, **kwargs) as response:
                     response.raise_for_status()
                     resp = await response.json()
-                    # self.logger.info(f"{url}响应:\n {pformat(resp)}")
                     if resp.get('code') != 0:
                         retries += 1
                         if self.logger:
-                            self.logger.warning(f"请求失败code不等于0 {resp}, 重试 {retries}/{self.max_retries}")
-                            await asyncio.sleep(5)
-                            continue
+                            self.logger.info(f"响应 {resp}, 重试 {retries}/{self.max_retries}")
+                        await asyncio.sleep(5)
+                        continue
                     return resp
             except Exception as e:
                 retries += 1
@@ -42,7 +57,11 @@ class AsyncRequestClient:
                         self.aliyun_log.logging(
                             code="9006",
                             message=f"请求异常达到最大重试次数",
-                            data=f"{url}"
+                            data={"utl": url,
+                                      "method": method,
+                                      "requestBody": kwargs,
+                                      "response": resp
+                                }
                         )
                     return
                 if self.logger:

+ 1 - 0
core/models/spiders_config_models.py

@@ -3,6 +3,7 @@ from pydantic import BaseModel, AnyUrl
 class BaseConfig(BaseModel):
     base_url: AnyUrl = None
     request_timeout: int = 30
+    max_retries: int = 3
     headers: dict = {}
 
 class PlatformConfig(BaseConfig):

+ 6 - 5
core/utils/extractors.py

@@ -41,10 +41,11 @@ def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None,aliy
             continue
         value = safe_extract(video, path)
         if value is None and logger:
-            logger.warning(f"{trace_id} 字段提取失败: {field} 路径: {path}")
-            # aliyun_log.logging(
-            #     code=""
-            #     trace_id=trace_id,
-            # )
+            logger.warning(f"字段提取失败: {field} 路径: {path}")
+            aliyun_log.logging(
+                code="9024",
+                message=f"字段提取失败: {field} 路径: {path}"
+
+            )
         result[field] = value
     return result

+ 8 - 6
core/utils/log/aliyun_log.py

@@ -7,9 +7,9 @@ import time
 from aliyun.log import LogClient, PutLogsRequest, LogItem
 
 from config import settings
-from core.utils.log.log_codes import CODES
 
 proxies = {"http": None, "https": None}
+from core.utils.trace_utils import get_current_trace_id  # 导入工具函数
 
 
 class AliyunLogger(object):
@@ -32,10 +32,12 @@ class AliyunLogger(object):
         正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
         """
         # 设置阿里云日志服务的访问信息
-        if message is None:
-            message = LOG_CODES.get(code, "未知错误")
-        if data is None:
-            data = {}
+        # if message is None:
+        #     message = LOG_CODES.get(code, "未知错误")
+        # if data is None:
+        #     data = {}
+        # 优先使用传入的 trace_id,否则从上下文获取
+        current_trace_id = get_current_trace_id() or ""
         accessKeyId = settings.ALIYUN_ACCESS_KEY_ID
         accessKey = settings.ALIYUN_ACCESS_KEY_SECRET
         if self.env == "dev":
@@ -61,7 +63,7 @@ class AliyunLogger(object):
         """
         message = message.replace("\r", " ").replace("\n", " ")
         contents = [
-            (f"TraceId", str(trace_id)),
+            (f"TraceId", str(current_trace_id)),
             (f"code", str(code)),
             (f"platform", str(self.platform)),
             (f"mode", str(self.mode)),

+ 2 - 1
core/utils/log/local_log.py

@@ -6,7 +6,7 @@ from loguru._logger import Logger as LoguruLogger
 from loguru import logger as global_logger
 
 from config import settings
-
+from core.utils.trace_utils import get_current_trace_id
 
 class Local:
     """
@@ -55,6 +55,7 @@ class Local:
                        "<level>{level}</level> | "
                        "<cyan>{extra[platform]}</cyan> | "
                        "<cyan>{extra[mode]}</cyan> | "
+                        f"<magenta>trace_id={get_current_trace_id()}</magenta> | "
                        "{message}"
             )
 

+ 3 - 0
core/utils/log/log_codes.py

@@ -25,6 +25,7 @@ CODES = {
     "1009": "成功发送至ETL",
     "1010": "任务执行完成",
     "1011": "视频数量达到当日最大值",
+    "1012": "初始化请求",
 
     # 调度监控 (15xx)
     "1500": "主进程启动成功",
@@ -70,6 +71,8 @@ CODES = {
     "9020": "过滤条件不匹配",
     "9021": "异步请求失败",
     "9022": "子进程内部异常",
+    "9023": "请求返回code非0",
+    "9024": "字段提取失败",
 
     # 系统致命错误 (99xx)
     "9900": "数据库连接失败",

+ 2 - 20
core/utils/log/logger_manager.py

@@ -12,6 +12,7 @@ class LoggerManager:
     _local_loggers: dict[str, LoguruLogger] = {}
     _aliyun_loggers: dict[str, AliyunLogger] = {}
 
+
     @staticmethod
     def get_logger(
         platform: str="system",
@@ -44,23 +45,4 @@ class LoggerManager:
                 mode=mode,
                 env=env
             )
-        return LoggerManager._aliyun_loggers[key]
-
-    def log_event(
-        self,
-        code: str,
-        message: str,
-        data: dict[str, Any],
-        trace_id: str = "",
-        platform: str = "system",
-        mode: str = "crawler",
-        level: str = "info"
-    ):
-        logger = self.get_logger(platform=platform, mode=mode)
-        aliyun_logger = self.get_aliyun_logger(platform=platform, mode=mode)
-
-        # 本地记录
-        getattr(logger, level.lower(), logger.info)(f"{code} | {message} | {data}")
-
-        # 阿里云记录
-        aliyun_logger.logging(code=code, message=message, data=data, trace_id=trace_id)
+        return LoggerManager._aliyun_loggers[key]

+ 7 - 2
core/utils/spider_config.py

@@ -3,7 +3,7 @@ import os
 from urllib.parse import urljoin
 import yaml
 from core.utils.path_utils import spiders_config_path
-from core.models.spiders_config_models import BaseConfig, PlatformConfig
+from core.models.spiders_config_models import PlatformConfig
 
 class SpiderConfig:
     _config = None
@@ -11,6 +11,10 @@ class SpiderConfig:
 
     @classmethod
     def _load_yaml(cls):
+        """
+        加载spiders_config.yaml
+        :return:
+        """
         if not os.path.exists(cls._config_path):
             raise FileNotFoundError(f"[配置错误] 找不到配置文件: {cls._config_path}")
         with open(cls._config_path, "r", encoding="utf-8") as f:
@@ -44,7 +48,8 @@ class SpiderConfig:
         except ValueError as e:
             raise ValueError(f"[配置错误] 平台 {classname} 的配置验证失败: {e}")
 
+
 # 示例使用
 if __name__ == '__main__':
-    config = SpiderConfig.get_platform_config("benshanzhufurecommend")
+    config = SpiderConfig.get_platform_config("yuannifuqimanmanrecommend")
     print(config)

+ 34 - 1
core/utils/trace_utils.py

@@ -2,5 +2,38 @@ import time
 import uuid
 
 
-def generate_trace_id():
+
+# core/utils/trace_utils.py
+import uuid
+import contextvars
+
+# 定义全局上下文变量,用于存储当前 trace_id
+trace_id = contextvars.ContextVar("trace_id", default=None)
+
+def generate_trace_id() -> str:
+    """生成唯一 trace_id"""
     return f"{uuid.uuid4().hex[:8]}-{int(time.time() * 1000)}"
+
+def get_current_trace_id() -> str:
+    """获取当前上下文的 trace_id"""
+    return trace_id.get()
+
+def set_current_trace_id(trace_id: str) -> None:
+    """设置当前上下文的 trace_id"""
+    trace_id.set(trace_id)
+
+class TraceContext:
+    """上下文管理器:自动创建和清理 trace_id"""
+    def __init__(self, trace_id: str = None):
+        self.trace_id = trace_id or generate_trace_id()
+        self.token = None  # 用于退出时恢复上下文
+
+    def __enter__(self) -> str:
+        # 保存当前上下文状态(token),并设置新的 trace_id
+        self.token = trace_id.set(self.trace_id)
+        return self.trace_id
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        # 退出时恢复之前的上下文(避免影响其他协程/线程)
+        if self.token:
+            trace_id.reset(self.token)

+ 52 - 55
scheduler/async_consumer.py

@@ -5,7 +5,7 @@ from typing import List
 import signal
 
 from core.utils.log.logger_manager import LoggerManager
-from core.utils.trace_utils import generate_trace_id
+from core.utils.trace_utils import generate_trace_id, TraceContext
 from services.async_mysql_service import AsyncMysqlService
 from spiders.spider_registry import get_spider_class
 
@@ -27,60 +27,57 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
 
     async def handle_single_message(message):
         """处理单条消息的业务逻辑(不含拉取和循环)"""
-        trace_id = generate_trace_id()
-        try:
-            payload = json.loads(message.message_body)
-            task_id = payload["id"]
-
-            logger.info(f"{trace_id} - 接收到任务消息: {task_id}")
-            aliyun_logger.logging(
-                code="1000",
-                message="任务接收成功",
-                data=payload,
-                trace_id=trace_id,
-                account=topic
-            )
-
-            # 从数据库查询配置
-            async with AsyncMysqlService() as mysql:
-                user_list = await mysql.get_user_list(task_id)
-                rule_dict = await mysql.get_rule_dict(task_id)
-
-            # 执行爬虫任务
-            CrawlerClass = get_spider_class(topic)
-            crawler = CrawlerClass(
-                rule_dict=rule_dict,
-                user_list=user_list,
-                trace_id=trace_id
-            )
-            await crawler.run()  # 爬虫成功执行后再确认消息
-
-            # 确认消息(单条消息处理成功后才 Ack)
-            await consumer.ack_message(message.receipt_handle)
-
-            logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack")
-            aliyun_logger.logging(
-                code="1010",
-                message="任务执行成功",
-                trace_id=trace_id,
-                data={"task_id": task_id, "topic": topic},
-                account=topic
-            )
-
-        except Exception as e:
-            logger.error(f"{trace_id} - 任务处理失败: {e} \n {traceback.format_exc()}")
-            aliyun_logger.logging(
-                code="9001",
-                message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
-                trace_id=trace_id,
-                data={
-                    "error_type": type(e).__name__,
-                    "stack_trace": traceback.format_exc(),
-                    "message_body": message.message_body
-                },
-                account=topic
-            )
-            # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
+
+        with TraceContext() as trace_id:  # 生成 trace_id 并绑定到上下文
+            try:
+                payload = json.loads(message.message_body)
+                task_id = payload["id"]
+
+                logger.info(f"[{topic}]接收到任务消息: {task_id}")
+                aliyun_logger.logging(
+                    code="1000",
+                    message="任务接收成功",
+                    data=payload,
+                    account=topic
+                )
+
+                # 从数据库查询配置
+                async with AsyncMysqlService() as mysql:
+                    user_list = await mysql.get_user_list(task_id)
+                    rule_dict = await mysql.get_rule_dict(task_id)
+
+                # 执行爬虫任务
+                CrawlerClass = get_spider_class(topic)
+                crawler = CrawlerClass(
+                    rule_dict=rule_dict,
+                    user_list=user_list,
+                )
+                await crawler.run()  # 爬虫成功执行后再确认消息
+
+                # 确认消息(单条消息处理成功后才 Ack)
+                await consumer.ack_message(message.receipt_handle)
+
+                logger.info(f"[{topic}]任务 {task_id} 执行成功并已 Ack")
+                aliyun_logger.logging(
+                    code="1010",
+                    message="任务执行成功",
+                    data={"task_id": task_id, "topic": topic},
+                    account=topic
+                )
+
+            except Exception as e:
+                logger.error(f"[{topic}]任务处理失败: {e} \n {traceback.format_exc()}")
+                aliyun_logger.logging(
+                    code="9001",
+                    message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
+                    data={
+                        "error_type": type(e).__name__,
+                        "stack_trace": traceback.format_exc(),
+                        "message_body": message.message_body
+                    },
+                    account=topic
+                )
+                # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
 
     # 独立的消费循环:拉取消息并调用处理函数
     async def consume_loop():

+ 13 - 13
services/async_mq_consumer.py

@@ -44,33 +44,33 @@ class AsyncRocketMQConsumer:
     async def receive_message(self) -> Optional[Message]:
         """异步拉取单条消息"""
         try:
-            self.logger.debug(f"开始拉取单条消息,等待时间: {self.wait_seconds}秒")
+            self.logger.debug(f"[{self.topic_name}]开始拉取单条消息,等待时间: {self.wait_seconds}秒")
             messages = await asyncio.to_thread(
                 self.consumer.consume_message,
                 settings.ROCKETMQ_BATCH,
                 self.wait_seconds,
             )
             if messages:
-                self.logger.debug(f"成功拉取到1条消息")
+                self.logger.debug(f"[{self.topic_name}]成功拉取到1条消息")
                 return messages[0]
             return None
         except MQExceptionBase as e:
             if getattr(e, "type", "") == "MessageNotExist":
                 # 更友好的日志输出,使用INFO级别而非ERROR
-                self.logger.info("当前没有可消费的消息,继续等待...")
+                self.logger.info(f"[{self.topic_name}]当前没有可消费的消息,继续等待...")
                 return None
             # 其他类型的异常仍按错误处理
-            self.logger.error(f"拉取消息失败: {e}")
+            self.logger.error(f"[{self.topic_name}]拉取消息失败: {e}")
             raise e
 
     async def ack_message(self, receipt_handle: str) -> None:
         """确认消费成功"""
         try:
             await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
-            self.logger.debug(f"消息确认成功")
+            self.logger.debug(f"[{self.topic_name}]消息确认成功")
         except Exception as e:
-            self.logger.error(f"确认消息失败: {e}")
-            raise RuntimeError(f"确认消息失败: {e}")
+            self.logger.error(f"[{self.topic_name}]确认消息失败: {e}")
+            raise RuntimeError(f"[{self.topic_name}]确认消息失败: {e}")
 
     async def process_single_message(self, handler: Callable[[Message], Any]) -> bool:
         """
@@ -83,15 +83,15 @@ class AsyncRocketMQConsumer:
             return False
 
         try:
-            self.logger.info(f"收到消息 ID: {message.message_id}")
+            self.logger.info(f"[{self.topic_name}]收到消息 ID: {message.message_id}")
             # 执行消息处理任务
             await handler(message)
             # 任务成功后确认消息
             await self.ack_message(message.receipt_handle)
-            self.logger.info(f"消息 ID: {message.message_id} 处理并确认成功")
+            self.logger.info(f"[{self.topic_name}]消息 ID: {message.message_id} 处理并确认成功")
             return True
         except Exception as e:
-            self.logger.error(f"处理消息失败: {e}", exc_info=True)
+            self.logger.error(f"[{self.topic_name}]处理消息失败: {e}", exc_info=True)
             # 消息处理失败,不会确认消息,RocketMQ会在可见时间后重新投递
             return False
 
@@ -119,10 +119,10 @@ class AsyncRocketMQConsumer:
 
 async def process_message(message: Message) -> None:
     """示例消息处理函数"""
-    self.logger.info(f"开始处理消息: {message.message_body}")
+    # self.logger.info(f"开始处理消息: {message.message_body}")
     # 模拟处理耗时操作
     await asyncio.sleep(2)
-    self.logger.info(f"消息处理完成")
+    # self.logger.info(f"消息处理完成")
 
 
 if __name__ == '__main__':
@@ -131,7 +131,7 @@ if __name__ == '__main__':
         consumer = AsyncRocketMQConsumer(
             topic_name="ynfqmm_recommend_prod",
             group_id="ynfqmm_recommend_prod",
-            wait_seconds=10,  # 长轮询等待时间
+            wait_seconds=30,  # 长轮询等待时间
         )
         await consumer.run_single_threaded(process_message)
 

+ 11 - 23
services/async_mysql_service.py

@@ -1,15 +1,12 @@
 import asyncio
 import json
-import logging
+
 from typing import List, Optional, Dict, Any, Tuple
 
 from config import settings
 from core.base.async_mysql_client import AsyncMySQLClient
 from core.utils.log.logger_manager import LoggerManager
 
-logger = logging.getLogger(__name__)
-
-
 class AsyncMysqlService:
     """
     异步业务数据库访问类(支持单例和async with)
@@ -21,7 +18,7 @@ class AsyncMysqlService:
     - 完善的错误处理和日志记录
     """
 
-    # 存储不同配置的单例实例,键为(classname, mode)元组
+    # 存储不同配置的单例实例,键为(platform, mode)元组
     _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
 
     def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
@@ -40,17 +37,15 @@ class AsyncMysqlService:
             cls._instances[key] = instance
         return cls._instances[key]
 
-    def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None):
+    def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None,trace_id: Optional[str] = None):
         """初始化数据库配置(仅在创建新实例时执行)"""
         # 避免重复初始化
         if self._client is not None:
             return
 
-        # 处理None值,设置默认值为"system"
-        platform = platform or "system"
-        mode = mode or "crawler"
-        self._platform = platform
-        self._mode = mode
+        self._platform = platform or "system"
+        self._mode = mode or "crawler"
+        self.trace_id = trace_id
 
         # 加载环境变量配置
         db_config = {
@@ -74,9 +69,12 @@ class AsyncMysqlService:
             db=db_config["db"],
             charset=db_config["charset"],
             minsize=1,
-            maxsize=10
+            maxsize=10,
+            logger=self.logger,
+            aliyun_logr=self.aliyun_logr
+
         )
-        self.logger.info(f"创建数据库服务实例: classname={platform}, mode={mode}")
+
 
     async def __aenter__(self):
         """支持async with上下文,初始化连接池"""
@@ -100,15 +98,6 @@ class AsyncMysqlService:
             except Exception as e:
                 self.logger.warning(f"连接池关闭失败: {str(e)}")
 
-    @property
-    def platform(self) -> str:
-        """获取服务关联的平台"""
-        return self._platform
-
-    @property
-    def mode(self) -> str:
-        """获取服务运行模式"""
-        return self._mode
 
     async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
         """执行查询并返回多行结果"""
@@ -168,7 +157,6 @@ class AsyncMysqlService:
                 AND platform = %s
                 AND strategy = %s \
               """
-        self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
         result = await self.fetch_one(sql, [self.platform, self.mode])
         return result["cnt"] if result else 0
 

+ 29 - 33
spiders/base_spider.py

@@ -25,24 +25,25 @@ class BaseSpider:
     子类只需根据业务重写少量方法,如 process_video/process_item。
     """
 
-    def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
-        self.trace_id = trace_id
+    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
         self.env = env
         self.user_list = user_list
         self.rule_dict = rule_dict
         self.class_name = self.__class__.__name__.lower()
 
-        # 读取配置
+        # 通过小写子类名读取配置
         self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
+
         if not self.platform_config:
             raise ValueError(f"找不到对应配置: {self.class_name}")
 
         self.platform = self.platform_config.platform
         self.mode = self.platform_config.mode
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-        self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
-        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
 
+        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+
+        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
         self.method = self.platform_config.method.upper()
         self.url = self.platform_config.url
         self.headers = self.platform_config.headers or {}
@@ -55,13 +56,14 @@ class BaseSpider:
 
         self.loop_times = self.platform_config.loop_times or 100
         self.loop_interval = self.platform_config.loop_interval or 5
+        self.logger.info(f"开始{self.platform}爬取,最大循环次数{self.loop_times},循环间隔{self.loop_interval}s")
         self.feishu_sheetid = self.platform_config.feishu_sheetid
 
         self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_logr)
+        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
 
-        self.timeout = 30
-        self.max_retries = 3
+        self.timeout = self.platform_config.request_timeout or 30
+        self.max_retries = self.platform_config.max_retries or 3
 
         # 当前分页游标,默认空字符串,支持动态替换request_body中任何字段(如cursor)
         self.dynamic_params = {key: "" for key in self.request_body_template.keys()}
@@ -75,26 +77,18 @@ class BaseSpider:
     async def run(self):
         """ 爬虫主流程 """
         await self.before_run()
-
         total_success, total_fail = 0, 0
         async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
             for loop_index in range(self.loop_times):
-                if self.limit_flag:
-                    self.logger.info(f"{self.trace_id} 已达到抓取限制,停止爬虫")
-                    break
-
                 if not await self.is_video_count_sufficient():
-                    self.logger.info(f"{self.trace_id} 视频抓取数量已达上限,提前结束")
-                    break
-
+                    self.logger.info(f"视频抓取数量已达上限,提前结束")
+                    return
                 succ, fail = await self.run_single_loop(session)
                 total_success += succ
                 total_fail += fail
-
                 await self._wait_for_next_loop(loop_index + 1)
 
-        await self.after_run()
-        self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_fail}")
+        self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
 
     async def run_single_loop(self, session) -> (int, int):
         """
@@ -102,9 +96,10 @@ class BaseSpider:
         """
         success_count, fail_count = 0, 0
         try:
+            # 爬取数据
             videos = await self.crawl_data(session)
             if not videos:
-                self.logger.info(f"{self.trace_id} 无数据返回,停止本轮")
+                self.logger.info(f"无数据返回,停止本轮")
                 return success_count, fail_count
 
             for video in videos:
@@ -117,9 +112,10 @@ class BaseSpider:
                     fail_count += 1
                 if self.limit_flag:
                     break
+            await self.after_run()
 
         except Exception as e:
-            self.logger.exception(f"{self.trace_id} 运行异常: {e}")
+            self.logger.exception(f"运行异常: {e}")
 
         return success_count, fail_count
 
@@ -148,7 +144,7 @@ class BaseSpider:
         )
 
         if not response:
-            self.logger.error(f"{self.trace_id} 响应为空")
+            self.logger.error(f"响应为空")
             return []
 
         # 更新游标(支持动态参数更新)
@@ -159,7 +155,7 @@ class BaseSpider:
         # 解析数据列表
         data_list = safe_extract(response, self.data_path)
         if not data_list:
-            self.logger.info(f"{self.trace_id} 未获取到有效数据")
+            self.logger.info(f"未获取到有效数据")
             return []
 
         return data_list
@@ -212,7 +208,7 @@ class BaseSpider:
 
             return pushed
         except Exception as e:
-            self.logger.exception(f"{self.trace_id} 视频处理异常: {e}")
+            self.logger.exception(f"视频处理异常: {e}")
             return False
 
     async def process_video(self, video: Dict) -> Optional[Dict]:
@@ -220,7 +216,7 @@ class BaseSpider:
         统一字段抽取及 VideoItem 初始化
         子类可重写或扩展以定制字段映射、过滤等
         """
-        self.logger.debug(f"{self.trace_id} 处理视频数据: {video.get('title', '无标题')}")
+        self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}")
         publish_user = None
         if self.user_list:
             import random
@@ -228,7 +224,7 @@ class BaseSpider:
         else:
             publish_user = {"uid": "default", "nick_name": "default_user"}
 
-        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id, aliyun_log=self.aliyun_logr)
+        item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log)
         item_kwargs.update({
             "user_id": publish_user.get("uid"),
             "user_name": publish_user.get("nick_name"),
@@ -240,11 +236,11 @@ class BaseSpider:
             item = VideoItem(**item_kwargs)
             video_dict = await item.produce_item()
             if not video_dict:
-                self.logger.warning(f"{self.trace_id} VideoItem 校验失败")
+                self.logger.warning(f"VideoItem 校验失败")
                 return None
             return video_dict
         except Exception as e:
-            self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
+            self.logger.error(f"VideoItem 初始化失败: {e}")
             return None
 
     async def filter_data(self, video: Dict) -> bool:
@@ -274,10 +270,10 @@ class BaseSpider:
         """
         try:
             await self.mq_producer.send_msg(video)
-            self.logger.info(f"{self.trace_id} 成功推送视频至ETL")
+            self.logger.info(f"成功推送视频至ETL")
             return True
         except Exception as e:
-            self.logger.exception(f"{self.trace_id} 推送ETL失败: {e}")
+            self.logger.exception(f"推送ETL失败: {e}")
             return False
 
     async def is_video_count_sufficient(self) -> bool:
@@ -290,15 +286,15 @@ class BaseSpider:
         async with AsyncMysqlService(self.platform, self.mode) as mysql:
             current_count = await mysql.get_today_videos()
         if current_count >= max_count:
-            self.logger.info(f"{self.trace_id} 今日视频已达上限: {current_count}")
-            self.aliyun_logr.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
+            self.logger.info(f"{self.platform} 今日视频数量已达上限: {current_count}")
+            self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
             return False
         return True
 
     async def _wait_for_next_loop(self, current_loop: int) -> None:
         """等待下次循环"""
         if current_loop < self.loop_times and self.loop_interval > 0:
-            self.logger.info(f"{self.trace_id} 等待 {self.loop_interval} 秒后进行下一次请求")
+            self.logger.info(f"等待 {self.loop_interval} 秒后进行下一次请求")
             await asyncio.sleep(self.loop_interval)
 
     async def before_run(self):

+ 3 - 3
spiders/universal_crawler.py

@@ -113,10 +113,10 @@ class UniversalCrawler(BaseSpider):
         if pipeline.process_item():
             self.download_cnt += 1
             self.mq.send_msg(item)
-            self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
+            self.aliyun_log.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
             if self.download_cnt >= self.download_min_limit:
                 self.has_enough_videos = True
-                self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}",
-                                         trace_id=self.trace_id)
+                self.aliyun_log.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}",
+                                        trace_id=self.trace_id)
             return True
         return False