Bläddra i källkod

整体改为进程+协程运行模式

zhangliang 1 vecka sedan
förälder
incheckning
035a7eaab7
52 ändrade filer med 555 tillägg och 704 borttagningar
  1. 17 0
      .env
  2. 1 1
      README.md
  3. 0 0
      application/config/common/__init__.py
  4. 0 0
      application/config/common/feishu/__init__.py
  5. 1 1
      application/config/common/feishu/feishu.py
  6. 1 1
      application/config/common/feishu/feishu_data.py
  7. 0 0
      application/config/common/feishu/feishu_insert.py
  8. 0 0
      application/config/common/feishu/feishu_utils.py
  9. 0 0
      application/config/common/ffmpeg/__init__.py
  10. 0 0
      application/config/common/ffmpeg/ffmpeg_utils.py
  11. 0 0
      application/config/common/gpt/__init__.py
  12. 0 0
      application/config/common/gpt/gpt4o_mini_help.py
  13. 0 0
      application/config/common/log/__init__.py
  14. 0 0
      application/config/common/log/aliyun_log.py
  15. 2 1
      application/config/common/log/local_log.py
  16. 1 1
      application/config/common/log/logger_manager.py
  17. 0 0
      application/config/common/messageQueue/__init__.py
  18. 1 1
      application/config/common/messageQueue/ack_message.py
  19. 0 0
      application/config/common/messageQueue/consumer.py
  20. 2 2
      application/config/common/messageQueue/mq.py
  21. 0 0
      application/config/common/mysql/__init__.py
  22. 1 2
      application/config/common/mysql/mysql_helper.py
  23. 1 3
      application/config/common/mysql/sql.py
  24. 0 0
      application/config/common/proxies/__init__.py
  25. 0 0
      application/config/common/proxies/fast_proxy.py
  26. 0 0
      application/config/common/redis/__init__.py
  27. 0 0
      application/config/common/redis/pyredis.py
  28. 0 3
      application/config/common/redis/redis_helper.py
  29. 0 0
      application/config/common/redis/xng_redis.py
  30. 0 26
      application/config/topic_group_queue.py
  31. 1 1
      application/items/item.py
  32. 3 4
      application/pipeline/pipeline.py
  33. 0 0
      application/spiders/__init__.py
  34. 241 0
      application/spiders/base_spider.py
  35. 6 0
      application/spiders/benshanzhufu_recommend.py
  36. 21 0
      application/spiders/spider_registry.py
  37. 118 0
      application/spiders/universal_crawler.py
  38. 0 90
      crawler_worker/rabbitmq_consumer.py
  39. 0 451
      crawler_worker/universal_crawler.py
  40. 0 0
      functions/__init__.py
  41. 0 0
      functions/clean_title.py
  42. 0 0
      functions/get_redirect_url.py
  43. 1 1
      functions/mysql_service.py
  44. 1 1
      functions/read_mysql_config.py
  45. 3 3
      functions/zqkd_db_redis.py
  46. 73 72
      main.py
  47. 2 6
      scheduler/scheduler_main.py
  48. 0 0
      test/__init__.py
  49. 15 0
      test/test1.py
  50. 17 0
      utils/env_loader.py
  51. 25 2
      utils/path_utils.py
  52. 0 31
      utils/project_paths.py

+ 17 - 0
.env

@@ -0,0 +1,17 @@
+# 环境配置
+ENV=prod
+LOG_LEVEL=INFO
+ENABLE_ALIYUN_LOG=true
+
+# 数据库配置
+DB_HOST=127.0.0.1
+DB_PORT=3306
+DB_USER=root
+DB_PASSWORD=123456
+DB_NAME=crawler_db
+
+# 消息队列配置(RabbitMQ/其他)
+MQ_HOST=localhost
+MQ_PORT=5672
+MQ_USER=guest
+MQ_PASSWORD=guest

+ 1 - 1
README.md

@@ -10,7 +10,7 @@
 ```bash
 AutoScraperX/
 ├── main.py                           # 项目入口:监听 MQ 消息,调度 UniversalCrawler
-├── crawler_worker/
+├── spiders/
 │   ├── universal_crawler.py         # 通用爬虫主类,读取配置并执行爬虫逻辑
 │   └── rabbitmq_consumer.py         # 多线程消费 MQ 中的消息并执行任务
 ├── configs/

+ 0 - 0
application/common/__init__.py → application/config/common/__init__.py


+ 0 - 0
application/common/feishu/__init__.py → application/config/common/feishu/__init__.py


+ 1 - 1
application/common/feishu/feishu.py → application/config/common/feishu/feishu.py

@@ -12,7 +12,7 @@ import urllib3
 
 sys.path.append(os.getcwd())
 
-from application.common.log import Local
+from application.config.common.log import Local
 proxies = {"http": None, "https": None}
 
 

+ 1 - 1
application/common/feishu/feishu_data.py → application/config/common/feishu/feishu_data.py

@@ -1,4 +1,4 @@
-from application.common.feishu.feishu_utils import FeishuUtils
+from application.config.common.feishu.feishu_utils import FeishuUtils
 
 
 class FsData:

+ 0 - 0
application/common/feishu/feishu_insert.py → application/config/common/feishu/feishu_insert.py


+ 0 - 0
application/common/feishu/feishu_utils.py → application/config/common/feishu/feishu_utils.py


+ 0 - 0
application/common/ffmpeg/__init__.py → application/config/common/ffmpeg/__init__.py


+ 0 - 0
application/common/ffmpeg/ffmpeg_utils.py → application/config/common/ffmpeg/ffmpeg_utils.py


+ 0 - 0
application/common/gpt/__init__.py → application/config/common/gpt/__init__.py


+ 0 - 0
application/common/gpt/gpt4o_mini_help.py → application/config/common/gpt/gpt4o_mini_help.py


+ 0 - 0
application/common/log/__init__.py → application/config/common/log/__init__.py


+ 0 - 0
application/common/log/aliyun_log.py → application/config/common/log/aliyun_log.py


+ 2 - 1
application/common/log/local_log.py → application/config/common/log/local_log.py

@@ -1,3 +1,4 @@
+import os.path
 import sys
 from datetime import date, timedelta, datetime
 from loguru import logger
@@ -30,7 +31,7 @@ class Local:
 
         # 设置日志文件名
         log_filename = f"{platform}-{mode}-{Local.today.strftime('%Y-%m-%d')}.log"
-        log_file_path = log_dir / log_filename
+        log_file_path = os.path.join(log_dir,log_filename)
 
         # 清除默认 handler
         logger.remove()

+ 1 - 1
application/common/log/logger_manager.py → application/config/common/log/logger_manager.py

@@ -1,4 +1,4 @@
-from application.common.log import Local, AliyunLogger
+from application.config.common.log import Local, AliyunLogger
 
 class LoggerManager:
     _local_loggers = {}

+ 0 - 0
application/common/messageQueue/__init__.py → application/config/common/messageQueue/__init__.py


+ 1 - 1
application/common/messageQueue/ack_message.py → application/config/common/messageQueue/ack_message.py

@@ -1,4 +1,4 @@
-from application.common import Local
+from application.config.common import Local
 
 
 def ack_message(mode, platform, recv_msgs, consumer, trace_id=None):

+ 0 - 0
application/common/messageQueue/consumer.py → application/config/common/messageQueue/consumer.py


+ 2 - 2
application/common/messageQueue/mq.py → application/config/common/messageQueue/mq.py

@@ -3,8 +3,8 @@ from mq_http_sdk.mq_exception import MQExceptionBase
 from mq_http_sdk.mq_producer import TopicMessage
 from mq_http_sdk.mq_client import MQClient
 import traceback
-from application.common.log import Local
-from application.common.log import AliyunLogger
+from application.config.common.log import Local
+from application.config.common.log import AliyunLogger
 
 
 class MQ(object):

+ 0 - 0
application/common/mysql/__init__.py → application/config/common/mysql/__init__.py


+ 1 - 2
application/common/mysql/mysql_helper.py → application/config/common/mysql/mysql_helper.py

@@ -1,14 +1,13 @@
 """
 数据库连接及操作
 """
-import redis
 import pymysql
 import os
 import sys
 
 sys.path.append(os.getcwd())
 
-from application.common.log import Local
+from application.config.common.log import Local
 from application.config.mysql_config import env_dict
 
 

+ 1 - 3
application/common/mysql/sql.py → application/config/common/mysql/sql.py

@@ -1,10 +1,8 @@
 
 
 from datetime import datetime
-import os
-import sys
 
-from application.common.mysql import MysqlHelper
+from application.config.common.mysql import MysqlHelper
 
 class Sql:
     """

+ 0 - 0
application/common/proxies/__init__.py → application/config/common/proxies/__init__.py


+ 0 - 0
application/common/proxies/fast_proxy.py → application/config/common/proxies/fast_proxy.py


+ 0 - 0
application/common/redis/__init__.py → application/config/common/redis/__init__.py


+ 0 - 0
application/common/redis/pyredis.py → application/config/common/redis/pyredis.py


+ 0 - 3
application/common/redis/redis_helper.py → application/config/common/redis/redis_helper.py

@@ -1,12 +1,9 @@
 import redis
-import pymysql
 import os
 import sys
 
 sys.path.append(os.getcwd())
 
-from application.common.log import Local
-from application.config.mysql_config import env_dict
 
 class RedisHelper:
     @classmethod

+ 0 - 0
application/common/redis/xng_redis.py → application/config/common/redis/xng_redis.py


+ 0 - 26
application/config/topic_group_queue.py

@@ -1,26 +0,0 @@
-import yaml
-from utils.project_paths import config_dir
-
-class TopicGroup:
-    def __init__(self, config_path=f"{config_dir}/topic_map.yaml"):
-        with open(config_path, "r") as f:
-            data = yaml.safe_load(f)
-            self.topics = data.get("topics", [])  # 直接获取 topic 列表
-
-    def __iter__(self):
-        """支持迭代遍历 topics"""
-        return iter(self.topics)
-
-    def __len__(self):
-        return len(self.topics)
-
-    def __str__(self):
-        return str(self.topics)
-
-
-if __name__ == '__main__':
-    tg = TopicGroup()
-    for i in tg:
-        mmode = i.split("_")[1]
-        print(mmode)
-

+ 1 - 1
application/items/item.py

@@ -1,5 +1,5 @@
 import time
-from application.functions import clean_title
+from functions import clean_title
 
 
 class VideoItem(object):

+ 3 - 4
application/pipeline/pipeline.py

@@ -1,16 +1,15 @@
-import hashlib
 import re
 import sys
 import os
 import time
 
-from application.common.feishu.feishu_utils import FeishuUtils
+from application.config.common.feishu.feishu_utils import FeishuUtils
 
 sys.path.append(os.getcwd())
 from datetime import datetime
 
-from application.common import MysqlHelper, AliyunLogger
-from application.common.redis.pyredis import RedisClient
+from application.config.common import MysqlHelper, AliyunLogger
+from application.config.common import RedisClient
 
 
 class PiaoQuanPipeline(object):

+ 0 - 0
crawler_worker/__init__.py → application/spiders/__init__.py


+ 241 - 0
application/spiders/base_spider.py

@@ -0,0 +1,241 @@
+import asyncio
+import aiohttp
+from abc import ABC
+from typing import List, Dict, Optional
+import time
+from application.config.common import LoggerManager
+from utils.extractors import safe_extract
+from application.config.common import MQ
+from utils.config_loader import ConfigLoader  # 新增导入
+
+
+class BaseSpider(ABC):
+    """
+    通用爬虫基类:支持严格顺序执行流程
+    """
+
+    MAX_RETRIES = 3  # 单个请求最大重试次数
+    TIMEOUT = 30  # 请求超时时间(秒)
+
+    def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
+        self.trace_id = trace_id
+        self.env = env
+        self.user_list = user_list
+        self.rule_dict = rule_dict
+        self.class_name = self.__class__.__name__  # 获取子类类名
+
+        # 根据类名自动获取配置
+        self.platform_config = ConfigLoader.get_config_by_class_name(self.class_name)
+        if not self.platform_config:
+            raise ValueError(f"找不到对应配置: {self.class_name}")
+
+        # 初始化日志和MQ
+        self.platform = self.platform_config.get("platform")
+        self.mode = self.platform_config.get("mode")
+        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
+        self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
+        self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
+
+        # 请求配置
+        self.method = self.platform_config.get("method", "GET").upper()
+        self.url = self.platform_config.get("url")
+        self.headers = self.platform_config.get("headers", {})
+        self.body = self.platform_config.get("request_body", {})
+        self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
+        self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
+        self.video_fields_map = self.platform_config.get("video_fields_map", {})
+
+        # 流程控制配置
+        self.loop_times = self.platform_config.get("loop_times", 1)  # 循环次数
+        self.loop_interval = self.platform_config.get("loop_interval", 0)  # 循环间隔(秒)
+
+        self.logger.info(
+            f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
+
+    async def _send_async_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
+        """
+        发送异步HTTP请求,支持重试机制
+        """
+        retries = 0
+        timeout = aiohttp.ClientTimeout(total=self.TIMEOUT)
+
+        while retries < self.MAX_RETRIES:
+            try:
+                async with aiohttp.ClientSession(timeout=timeout) as session:
+                    async with session.request(method, url, **kwargs) as response:
+                        response.raise_for_status()
+                        return response
+            except Exception as e:
+                retries += 1
+                remaining_attempts = self.MAX_RETRIES - retries
+
+                if retries < self.MAX_RETRIES:
+                    self.logger.warning(
+                        f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
+                        f"剩余尝试次数: {remaining_attempts}"
+                    )
+                    await asyncio.sleep(1)  # 异步等待
+                else:
+                    self.aliyun_logr.logging(
+                        code="5001",
+                        message="请求失败,已达到最大重试次数",
+                        data={
+                            "url": url,
+                            "method": method,
+                            "error": str(e),
+                            "headers": kwargs.get("headers", {}),
+                            "body": kwargs.get("json", {})
+                        },
+                        trace_id=self.trace_id
+                    )
+                    self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
+                    raise
+
+    async def crawl_data(self) -> Optional[List[Dict]]:
+        """异步获取视频数据"""
+        self.logger.info(f"{self.trace_id}--开始获取视频数据")
+        try:
+            response = await self._send_async_request(
+                method=self.method,
+                url=self.url,
+                headers=self.headers,
+                json=self.body
+            )
+            result = await response.json()
+            data = safe_extract(result, self.data_path)
+
+            if not data:
+                self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
+                return []
+
+            self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
+            return data
+        except Exception as e:
+            self.logger.error(f"{self.trace_id}--获取视频数据失败: {e}")
+            return []
+
+    def filter_data(self, video: Dict) -> bool:
+        """校验视频是否符合规则"""
+        if not self.rule_dict:
+            return True
+
+        rule_duration = self.rule_dict.get("duration")
+        if rule_duration:
+            video_url = safe_extract(video, self.video_fields_map.get("video_url"))
+            duration = self.get_video_duration(video_url)
+            if not (rule_duration['min'] <= duration <= rule_duration['max']):
+                return False
+
+        rule_videos_cnt = self.rule_dict.get("videos_cnt")
+        if rule_videos_cnt:
+            video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
+            if video_count >= rule_videos_cnt.get("min", 0):
+                return False
+
+        return True
+
+    def process_video(self, video: Dict) -> Optional[Dict]:
+        """处理单条视频数据"""
+        self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
+        try:
+            item = {}
+            for field, path in self.field_map.items():
+                value = safe_extract(video, path)
+                if value is None:
+                    self.logger.warning(f"{self.trace_id}--字段提取失败: {field}")
+                    continue
+                item[field] = value
+
+            if not item:
+                self.logger.warning(f"{self.trace_id}--视频处理结果为空")
+                return None
+
+            item.update({
+                "platform": self.platform,
+                "strategy": self.mode,
+                "session": f"{self.platform}-{int(time.time())}"
+            })
+
+            self.logger.debug(f"{self.trace_id}--视频处理成功")
+            return item
+        except Exception as e:
+            self.logger.error(f"{self.trace_id}--视频处理异常: {e}")
+            return None
+
+    def push_to_etl(self, item: Dict) -> bool:
+        """推送数据到ETL(同步)"""
+        self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('title', '无标题')}")
+        try:
+            self.mq.send_msg(item)
+            self.aliyun_logr.logging(
+                code="1002",
+                message="成功发送至ETL",
+                data=item,
+                trace_id=self.trace_id
+            )
+            self.logger.info(f"{self.trace_id}--数据推送成功")
+            return True
+        except Exception as e:
+            self.logger.error(f"{self.trace_id}--数据推送失败: {e}")
+            return False
+
+    async def run(self):
+        """
+        异步运行爬虫任务,严格按顺序执行
+        1. 爬取
+        2. 过滤
+        3. 处理每条数据
+        4. 推送到ETL
+        """
+        self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
+        for loop_index in range(1, self.loop_times + 1):
+
+            self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
+            loop_start_time = time.time()
+
+            # 步骤1: 获取视频数据(失败则跳过当前循环)
+            video_list = await self.crawl_data()
+            if not video_list:
+                self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
+                await self._wait_for_next_loop(loop_index)
+                continue
+
+            # 步骤2: 处理每条视频并推送到ETL
+            success_count = 0
+            fail_count = 0
+
+            for video in video_list:
+                # 步骤2.1: 校验视频(失败则跳过)
+                if not self.filter_data(video):
+                    self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
+                    continue
+
+                # 步骤2.2: 处理视频(失败则记录并继续)
+                item = self.process_video(video)
+                if not item:
+                    self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
+                    fail_count += 1
+                    continue
+
+                # 步骤2.3: 推送到ETL(失败则记录并继续)
+                if self.push_to_etl(item):
+                    success_count += 1
+                else:
+                    fail_count += 1
+
+            loop_duration = time.time() - loop_start_time
+            self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
+                             f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
+
+            # 等待下一次循环
+            await self._wait_for_next_loop(loop_index)
+
+        self.logger.info(f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成")
+        return True
+
+    async def _wait_for_next_loop(self, current_loop: int) -> None:
+        """等待下一次循环请求"""
+        if current_loop < self.loop_times and self.loop_interval > 0:
+            self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
+            await asyncio.sleep(self.loop_interval)

+ 6 - 0
application/spiders/benshanzhufu_recommend.py

@@ -0,0 +1,6 @@
+from application.spiders.base_spider import BaseSpider
+
+
+class BenshanzhufuRecommend(BaseSpider):
+    def __init__(self, rule_dict, user_list, trace_id):
+        super().__init__(rule_dict, user_list, trace_id)

+ 21 - 0
application/spiders/spider_registry.py

@@ -0,0 +1,21 @@
+from application.spiders.benshanzhufu_recommend import BenshanzhufuRecommend
+
+
+SPIDER_CLASS_MAP = {
+    "bszf_recommend_prod": BenshanzhufuRecommend
+}
+
+
+
+
+
+
+
+
+def get_spider_class(topic: str):
+    """
+    根据 topic 获取对应爬虫类
+    :param topic: MQ 消息的 topic 名称
+    :return: 爬虫类(继承自 BaseSpider)
+    """
+    return SPIDER_CLASS_MAP.get(topic, "未找到对应配置")

+ 118 - 0
application/spiders/universal_crawler.py

@@ -0,0 +1,118 @@
+import random
+import time
+import uuid
+import requests
+from typing import Dict, List, Optional
+from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
+
+from application.config.common import MQ
+from configs.config import base_url
+from functions import MysqlService
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from utils.extractors import safe_extract
+
+from application.spiders.base_spider import BaseSpider  # 抽象基类导入
+
+def before_send_log(retry_state: RetryCallState) -> None:
+    attempt = retry_state.attempt_number
+    last_result = retry_state.outcome
+    if last_result.failed:
+        exc = last_result.exception()
+        logger = retry_state.kwargs.get('logger')
+        url = retry_state.args[0] if retry_state.args else "unknown"
+        if logger:
+            logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
+
+
+class UniversalCrawler(BaseSpider):
+    def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
+        super().__init__(platform_config, rule_dict, user_list, trace_id, env)
+        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
+        self.base_api = base_url
+        self.has_enough_videos = False
+        self.download_cnt = 0
+        self.loop_times = self.platform_config.get('loop_times', 1)
+
+        self.request_method = self.platform_config["method"].upper()
+        self.request_url = self.platform_config["url"]
+        self.request_headers = self.platform_config.get("headers", {})
+        self.request_body = self.platform_config.get("request_body", {})
+        self.response_data_path = self.platform_config["response_parse"]["data_path"]
+        self.video_fields_map = self.platform_config["response_parse"]["fields"]
+
+    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
+    def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
+        method = method or self.request_method
+        headers = headers or self.request_headers
+        payload = payload or self.request_body
+
+        response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout)
+        response.raise_for_status()
+        resp = response.json()
+        if resp.get("code") == 0:
+            return resp
+        raise ValueError(f"API响应错误: {resp}")
+
+    def fetch_video_data(self) -> Optional[List[Dict]]:
+        self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}")
+        try:
+            response = self._send_request(self.request_url)
+            return safe_extract(response, self.response_data_path) or []
+        except Exception as e:
+            self.logger.error(f"{self.trace_id}--请求失败: {e}")
+            return []
+
+    def is_video_qualified(self, video: Dict) -> bool:
+        if not self.rule_dict:
+            return True
+
+        rule_duration = self.rule_dict.get("duration")
+        if rule_duration:
+            video_url = safe_extract(video, self.video_fields_map.get("video_url"))
+            duration = self.get_video_duration(video_url)
+            if not (rule_duration['min'] <= duration <= rule_duration['max']):
+                return False
+
+        rule_videos_cnt = self.rule_dict.get("videos_cnt")
+        if rule_videos_cnt:
+            video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
+            if video_count >= rule_videos_cnt.get("min", 0):
+                return False
+
+        return True
+
+    def transform_to_etl_item(self, video: Dict) -> Optional[Dict]:
+        item = VideoItem()
+        for field, path in self.video_fields_map.items():
+            val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
+            item.add_video_info(field, val)
+
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", f"{self.platform}-{int(time.time())}")
+        user = random.choice(self.user_list)
+        item.add_video_info("user_id", user["uid"])
+        item.add_video_info("user_name", user["nick_name"])
+
+        return item.produce_item()
+
+    def push_to_etl(self, item: Dict) -> bool:
+        trace_id = f"{self.platform}-{uuid.uuid4()}"
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=item,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            self.mq.send_msg(item)
+            self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
+            if self.download_cnt >= self.download_min_limit:
+                self.has_enough_videos = True
+                self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id)
+            return True
+        return False

+ 0 - 90
crawler_worker/rabbitmq_consumer.py

@@ -1,90 +0,0 @@
-import pika
-import asyncio
-import json
-from .universal_crawler import AsyncCrawler
-from .utils.log_config import setup_logger
-
-
-class RabbitMQConsumer:
-    def __init__(self, config_path: str):
-        self.config_path = config_path
-        self.aliyun_log = setup_logger("rabbitmq_consumer", "system")
-        self.consumer_tag = None
-
-    def connect(self):
-        """连接到RabbitMQ"""
-        try:
-            with open('config/rabbitmq_config.yaml', 'r', encoding='utf-8') as f:
-                rabbit_config = json.load(f)
-
-            self.connection = pika.BlockingConnection(
-                pika.ConnectionParameters(
-                    host=rabbit_config.get('host', 'localhost'),
-                    port=rabbit_config.get('port', 5672),
-                    credentials=pika.PlainCredentials(
-                        rabbit_config.get('username', 'guest'),
-                        rabbit_config.get('password', 'guest')
-                    )
-                )
-            )
-            self.channel = self.connection.channel()
-            self.aliyun_log.info("成功连接到RabbitMQ")
-            return True
-        except Exception as e:
-            self.aliyun_log.error(f"连接RabbitMQ失败: {str(e)}")
-            return False
-
-    async def process_message(self, ch, method, properties, body):
-        """处理消息"""
-        task = json.loads(body)
-        self.aliyun_log.info(f"收到任务: {task.get('task_id', '未知ID')}")
-
-        platform = task.get('platform', 'unknown_platform')
-        mode = task.get('mode', 'recommend')
-
-        crawler = AsyncCrawler(platform, mode, self.config_path)
-        try:
-            await crawler.run()
-            ch.basic_ack(delivery_tag=method.delivery_tag)
-            self.aliyun_log.info(f"任务完成: {task.get('task_id', '未知ID')}")
-        except Exception as e:
-            self.aliyun_log.error(f"处理任务异常: {str(e)}")
-            # 重新排队
-            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
-
-    def start_consuming(self):
-        """开始消费消息"""
-        if not self.connect():
-            return
-
-        queue_name = self.setup_queue()
-        if not queue_name:
-            return
-
-        try:
-            self.channel.basic_consume(
-                queue=queue_name,
-                on_message_callback=self._sync_process_message,
-                auto_ack=False
-            )
-            self.aliyun_log.info(f"开始消费队列: {queue_name}")
-            self.channel.start_consuming()
-        except KeyboardInterrupt:
-            self.channel.stop_consuming()
-        except Exception as e:
-            self.aliyun_log.error(f"消费消息失败: {str(e)}")
-        finally:
-            self.connection.close()
-
-    def _sync_process_message(self, ch, method, properties, body):
-        """同步包装异步处理函数"""
-        asyncio.run(self.process_message(ch, method, properties, body))
-
-
-def main():
-    consumer = RabbitMQConsumer("config/platform_config.yaml")
-    consumer.start_consuming()
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 451
crawler_worker/universal_crawler.py

@@ -1,451 +0,0 @@
-import os
-import sys
-import json
-import random
-import time
-import uuid
-import yaml
-import requests
-import cv2
-from datetime import datetime
-from typing import Dict, Any, List, Optional, Union
-from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
-
-from application.common.log.logger_manager import LoggerManager
-from utils.extractors import safe_extract, extract_multiple
-
-# 添加公共模块路径
-sys.path.append(os.getcwd())
-print(os.getcwd())
-
-from application.items import VideoItem
-from application.pipeline import PiaoQuanPipeline
-from application.common.messageQueue import MQ
-from application.common.log import AliyunLogger
-from application.common.log import Local
-from configs.config import base_url
-from application.functions.mysql_service import MysqlService
-
-
-def before_send_log(retry_state: RetryCallState) -> None:
-    """请求重试前记录日志"""
-    attempt = retry_state.attempt_number
-    last_result = retry_state.outcome
-    if last_result.failed:
-        exc = last_result.exception()
-        logger = retry_state.kwargs.get('logger')
-        url = retry_state.args[0] if retry_state.args else "unknown"
-        if logger:
-            logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
-
-
-class UniversalCrawler:
-    """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
-
-    def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
-        """
-        初始化爬虫
-        :param platform_config: 平台配置字典
-        :param rule_dict: 规则字典
-        :param user_list: 用户列表
-        :param trace_id: 追踪ID
-        :param env: 运行环境
-        """
-        self.platform = platform_config["platform"]
-        self.mode = platform_config["mode"]
-        self.rule_dict = rule_dict
-        self.user_list = user_list
-        self.trace_id = trace_id
-        self.env = env
-        self.config = platform_config
-        # 初始化日志
-        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-        self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
-        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
-
-        self.has_enough_videos = False
-        self.base_api = base_url
-        self.loop_times = platform_config.get('loop_times', 1)
-
-        # 提取常用配置为类属性,提高可读性
-        self.request_method = self.config["method"].upper()
-        self.request_url = self.config["url"]
-        self.request_headers = self.config.get("headers", {})
-        self.request_body = self.config.get("request_body", {})
-        self.response_data_path = self.config["response_parse"]["data_path"]
-        self.video_fields_map = self.config["response_parse"]["fields"]
-
-        # 下载限制配置
-        self.download_min_limit = self.config.get("download_limit", {}).get("min", 200)
-
-    @retry(
-        stop=stop_after_attempt(3),  # 最多重试3次
-        wait=wait_fixed(2),  # 每次重试间隔2秒
-        retry=retry_if_exception_type((requests.RequestException, ValueError)),
-        before=before_send_log,  # 添加重试前日志
-    )
-    def _send_request(self, url: str, method: str = None, headers: Dict = None,
-                      payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
-        """
-        发送API请求,失败自动重试最多3次
-        :param url: 请求URL
-        :param method: 请求方法,默认使用配置中的方法
-        :param headers: 请求头,默认使用配置中的头
-        :param payload: 请求体,默认使用配置中的体
-        :param timeout: 超时时间
-        :return: 响应JSON数据或None
-        """
-        # 使用默认配置(如果未提供参数)
-        method = method or self.request_method
-        headers = headers or self.request_headers
-        payload = payload or self.request_body
-
-        try:
-            self.logger.info(f"{self.trace_id}--正在发送请求: {url}")
-            response = requests.request(
-                method=method,
-                url=url,
-                headers=headers,
-                json=payload,
-                timeout=timeout
-            )
-            response.raise_for_status()
-            resp = response.json()
-            if resp.get("code") == 0:
-                return resp
-            self.logger.warning(f"{self.trace_id}--API响应非零状态码: {resp}")
-            raise ValueError(f"API响应错误: {resp}")
-        except requests.exceptions.Timeout:
-            self.logger.error(f"{self.trace_id}--请求超时: {url}")
-            raise
-        except requests.exceptions.RequestException as e:
-            self.logger.error(f"{self.trace_id}--请求异常: {e}")
-            raise
-        except json.JSONDecodeError as e:
-            self.logger.error(f"{self.trace_id}--解析JSON响应失败: {e}")
-            raise
-        except Exception as e:
-            # 在最后一次失败时记录详细日志
-            self.aliyun_log.logging(
-                code="3000",
-                message=f"请求失败: {url}",
-                data={"error": str(e)},
-                trace_id=self.trace_id
-            )
-            self.logger.error(f"{self.trace_id}--意外错误: {e}")
-            raise
-
-    def get_video_duration(self, video_url: str, timeout: int = 20) -> float:
-        """
-        获取网络视频的时长(秒),增加网络异常处理和超时控制
-        :param video_url: 视频URL
-        :param timeout: 超时时间
-        :return: 视频时长(秒),失败时返回0
-        """
-        # 检查URL是否可访问
-        try:
-            response = requests.head(video_url, timeout=timeout)
-            response.raise_for_status()  # 检查HTTP状态码
-        except requests.exceptions.RequestException as e:
-            self.logger.error(f"{self.trace_id}--网络错误: 无法访问视频URL - {e}")
-            return 0
-
-        cap = None
-        try:
-            # 创建VideoCapture对象
-            cap = cv2.VideoCapture(video_url)
-
-            # 设置缓冲区大小,减少延迟
-            cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
-
-            # 尝试打开视频流(最多尝试3次)
-            max_attempts = 3
-            for attempt in range(max_attempts):
-                if cap.isOpened():
-                    break
-                self.logger.info(f"{self.trace_id}--尝试打开视频流 ({attempt + 1}/{max_attempts})...")
-                time.sleep(1)
-
-            if not cap.isOpened():
-                self.logger.error(f"{self.trace_id}--错误: 无法打开视频流 {video_url}")
-                return 0
-
-            # 获取视频属性
-            fps = cap.get(cv2.CAP_PROP_FPS)
-            frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
-
-            if fps <= 0 or frame_count <= 0:
-                # 某些网络视频可能无法直接获取总帧数,尝试读取几帧估算
-                self.logger.info(f"{self.trace_id}--无法获取总帧数,尝试估算...")
-                frame_count = 0
-                start_time = time.time()
-
-                # 读取10帧估算帧率和时长
-                for _ in range(10):
-                    ret, frame = cap.read()
-                    if not ret:
-                        break
-                    frame_count += 1
-
-                elapsed_time = time.time() - start_time
-                if elapsed_time > 0:
-                    estimated_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
-                    # 假设视频时长为30秒(可根据实际情况调整)
-                    estimated_duration = 30.0
-                    self.logger.info(f"{self.trace_id}--估算视频时长: {estimated_duration}秒 (基于{frame_count}帧)")
-                    return estimated_duration
-                else:
-                    self.logger.error(f"{self.trace_id}--错误: 无法估算视频时长")
-                    return 0
-
-            duration = frame_count / fps
-            self.logger.info(f"{self.trace_id}--视频时长: {duration}秒")
-            return duration
-
-        except Exception as e:
-            self.logger.error(f"{self.trace_id}--获取视频时长时发生异常: {e}")
-            return 0
-
-        finally:
-            if cap:
-                cap.release()  # 确保资源释放
-
-    def _extract_video_list(self, response: Dict) -> List[Dict]:
-        """从API响应中提取视频列表"""
-        return safe_extract(response, self.response_data_path) or []
-
-    def _is_video_valid(self, video_data: Dict) -> bool:
-        """
-        判断视频是否满足条件
-        :param video_data: 视频数据
-        :return: True if valid, False otherwise
-        """
-        if not self.rule_dict:
-            return True
-
-        # 检查视频时长规则
-        rule_duration = self.rule_dict.get("duration")
-        if rule_duration:
-            extract_video_url_path = self.video_fields_map.get("video_url")
-            if not extract_video_url_path:
-                self.logger.warning(f"{self.trace_id}--缺少视频URL字段映射")
-                return False
-
-            video_url = safe_extract(video_data, extract_video_url_path)
-            if not video_url:
-                self.logger.warning(f"{self.trace_id}--无法提取视频URL")
-                return False
-
-            video_duration = self.get_video_duration(video_url)
-            min_duration = rule_duration.get("min", 0)
-            max_duration = rule_duration.get("max", float('inf'))
-
-            if not (min_duration <= video_duration <= max_duration):
-                self.logger.info(f"{self.trace_id}--视频时长{video_duration}秒超出范围[{min_duration}, {max_duration}]")
-                return False
-
-        # 检查视频数量规则
-        rule_videos_cnt = self.rule_dict.get("videos_cnt")
-        if rule_videos_cnt:
-            # 这里应该查询数据库获取实际视频数量
-            # 示例代码,实际实现需要根据业务逻辑完善
-            video_count = self._get_video_count_from_db()  # 假设这是获取视频数量的方法
-            min_count = rule_videos_cnt.get("min", 0)
-            if video_count >= min_count:
-                self.logger.info(f"{self.trace_id}--视频数量{video_count}达到最小要求{min_count}")
-                return False
-
-        return True
-
-    def _get_video_count_from_db(self) -> int:
-        """从数据库获取视频数量"""
-        mysql = MysqlService(self.platform,self.mode,self.trace_id)
-        video_count = mysql.get_today_videos()
-        return video_count
-
-    def _process_video(self, video_data: Dict) -> bool:
-        """
-        处理单个视频数据
-        :param video_data: 视频数据
-        :return: 处理成功返回True,失败返回False
-        """
-        # 先判断视频是否符合条件
-        if not self._is_video_valid(video_data):
-            self.logger.info(f"{self.trace_id}--视频因验证不通过被跳过")
-            return False
-
-        # 创建视频项
-        item = VideoItem()
-
-        # 从配置中获取字段映射并填充数据
-        for field_name, path in self.video_fields_map.items():
-            if isinstance(path, str) and path.startswith("$."):
-                match = safe_extract(video_data, path)
-                item.add_video_info(field_name, match)
-            else:
-                # 如果是固定值(int、str等),直接使用
-                item.add_video_info(field_name, path)
-
-        # 添加固定字段
-        item.add_video_info("platform", self.platform)
-        item.add_video_info("strategy", self.config["mode"])
-        item.add_video_info("session", f"{self.platform}-{int(time.time())}")
-
-        # 随机选择一个用户
-        our_user = random.choice(self.user_list)
-        item.add_video_info("user_id", our_user["uid"])
-        item.add_video_info("user_name", our_user["nick_name"])
-
-        video_title = item.get("title", "未知标题")
-        self.logger.info(f"{self.trace_id}--正在处理视频: {video_title}")
-
-        # 处理管道
-        trace_id = f"{self.platform}-{uuid.uuid4()}"
-        pipeline = PiaoQuanPipeline(
-            platform=self.platform,
-            mode=self.config["mode"],
-            rule_dict=self.rule_dict,
-            env=self.env,
-            item=item.produce_item(),
-            trace_id=trace_id,
-        )
-
-        if pipeline.process_item():
-            self.download_cnt += 1
-            self.mq.send_msg(item.produce_item())
-            self.aliyun_log.logging(
-                code="1002",
-                message="成功发送至ETL",
-                data=item.produce_item(),
-                trace_id=self.trace_id
-            )
-            self.logger.info(f"{self.trace_id}--视频处理完成并发送至消息队列,已处理总数: {self.download_cnt}")
-
-            # 检查下载限制
-            if self.download_cnt >= self.download_min_limit:
-                self.has_enough_videos = True
-                self.aliyun_log.logging(
-                    code="2000",
-                    message=f"达到下载限制: {self.download_min_limit}",
-                    trace_id=self.trace_id
-                )
-                self.logger.info(f"{self.trace_id}--达到下载限制,停止进一步处理")
-            return True
-
-        self.logger.warning(f"{self.trace_id}--通过管道处理视频失败")
-        return False
-
-    def _fetch_video_list(self) -> Optional[List[Dict]]:
-        """
-        获取并解析视频列表
-        :return: 视频列表或None
-        """
-        self.logger.info(f"{self.trace_id}--从{self.request_url}获取视频列表")
-        response = self._send_request(
-            self.request_url,
-            self.request_method,
-            self.request_headers,
-            self.request_body
-        )
-
-        if not response:
-            self.logger.error(f"{self.trace_id}--获取视频列表失败")
-            return None
-
-        video_list = self._extract_video_list(response)
-        self.logger.info(f"{self.trace_id}--获取到{len(video_list)}个视频")
-        return video_list
-
-    def _execute_post_actions(self):
-        """执行爬取后的额外操作(如曝光上报)"""
-        for action in self.config.get("post_actions", []):
-            if action.get("trigger") == "after_video_processed":
-                endpoint = action.get("endpoint")
-                payload = action.get("payload", {})
-                if endpoint:
-                    self.logger.info(f"{self.trace_id}--执行后置操作: {endpoint}")
-                    self._send_request(endpoint, payload=payload)
-
-    def run(self):
-        """执行爬取任务"""
-        self.aliyun_log.logging(
-            code=1003,
-            message="开始执行爬虫",
-            data=self.platform,
-            trace_id=self.trace_id
-        )
-        self.logger.info(f"{self.trace_id}--开始{self.platform}执行爬虫")
-
-        for loop in range(self.loop_times):
-            if self.has_enough_videos:
-                self.aliyun_log.logging(
-                    code=2000,
-                    message=f"[{self.platform}] 达到每日最大爬取量",
-                    data=self.platform,
-                    trace_id=self.trace_id
-                )
-                self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止爬虫")
-                break
-
-            self.logger.info(f"{self.trace_id}--开始第{loop + 1}/{self.loop_times}轮循环")
-            video_list = self._fetch_video_list()
-
-            if not video_list:
-                self.logger.warning(f"{self.trace_id}--视频列表为空,跳过本轮循环")
-                continue
-
-            for video_data in video_list:
-                if self.has_enough_videos:
-                    self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止处理")
-                    break
-
-                self._process_video(video_data)
-
-            # 执行额外操作(如曝光上报)
-            self._execute_post_actions()
-
-            # 添加循环间隔
-            loop_interval = self.config.get("loop_interval", 0)
-            if loop_interval > 0:
-                self.logger.info(f"{self.trace_id}--在下一轮循环前等待{loop_interval}秒")
-                time.sleep(loop_interval)
-
-        self.aliyun_log.logging(
-            code=0000,
-            message="爬虫执行完成",
-            data=self.platform,
-            trace_id=self.trace_id
-        )
-        self.logger.info(f"{self.trace_id}--平台{self.platform}的爬虫完成,已处理{self.download_cnt}个视频")
-
-
-if __name__ == '__main__':
-    cr = UniversalCrawler(
-        platform_config={
-            "platform": "benshanzhufu",
-            "mode": "recommend",
-            "method": "POST",
-            "url": "https://api.example.com/video/list",
-            "headers": {"Content-Type": "application/json"},
-            "request_body": {"page": 1, "size": 20},
-            "response_parse": {
-                "data_path": "$.data.items",
-                "fields": {
-                    "title": "$.title",
-                    "video_url": "$.videoUrl",
-                    "author": "$.author.name",
-                    "duration": "$.duration"
-                }
-            },
-            "download_limit": {"min": 200},
-            "loop_times": 3
-        },
-        rule_dict={
-            'videos_cnt': {'min': 500, 'max': 0},
-            'duration': {'min': 30, 'max': 1200}
-        },
-        user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}],
-        trace_id=str(uuid.uuid4())
-    )
-
-    cr.run()

+ 0 - 0
application/functions/__init__.py → functions/__init__.py


+ 0 - 0
application/functions/clean_title.py → functions/clean_title.py


+ 0 - 0
application/functions/get_redirect_url.py → functions/get_redirect_url.py


+ 1 - 1
application/functions/mysql_service.py → functions/mysql_service.py

@@ -1,7 +1,7 @@
 import json
 import traceback
 
-from application.common import MysqlHelper, AliyunLogger,Local
+from application.config.common import MysqlHelper, AliyunLogger,Local
 
 
 class MysqlService:

+ 1 - 1
application/functions/read_mysql_config.py → functions/read_mysql_config.py

@@ -1,6 +1,6 @@
 import json
 
-from application.common.mysql import MysqlHelper
+from application.config.common import MysqlHelper
 
 
 def get_config_from_mysql(log_type, source, text):

+ 3 - 3
application/functions/zqkd_db_redis.py → functions/zqkd_db_redis.py

@@ -2,15 +2,15 @@ import os
 import sys
 import threading
 import traceback
-from datetime import datetime, timedelta
+from datetime import timedelta
 
 import redis
 
-from application.common import Local
+from application.config.common import Local
 
 sys.path.append(os.getcwd())
 
-from application.common.mysql import MysqlHelper
+from application.config.common import MysqlHelper
 
 
 class DatabaseOperations:

+ 73 - 72
main.py

@@ -1,129 +1,130 @@
-import importlib
 import json
 import time
 import traceback
-from concurrent.futures import ThreadPoolExecutor, as_completed, Future
-from typing import Dict
+from multiprocessing import Process, cpu_count
+from typing import List, Dict
+import asyncio
 
-from application.common.logger_manager import LoggerManager
-from application.common.trace_utils import generate_trace_id
-from application.common import get_consumer, ack_message
-from crawler_worker.universal_crawler import UniversalCrawler
-from application.config import TopicGroup
-from application.functions.mysql_service import MysqlService
-from utils.config_loader import ConfigLoader
+from application.config.common import LoggerManager
+from utils.trace_utils import generate_trace_id
+from application.config.common import get_consumer, ack_message
+from functions import MysqlService
+from application.spiders.spider_registry import get_spider_class, SPIDER_CLASS_MAP
 
 
-def import_custom_class(class_path: str):
-    """
-    动态导入爬虫类,例如 crawler_worker.custom.xx.Crawler
-    """
-    module_path, class_name = class_path.rsplit(".", 1)
-    module = importlib.import_module(module_path)
-    return getattr(module, class_name)
-
-
-def handle_message(topic: str, mode: str):
-    """
-    单线程消费指定 topic 消息的核心逻辑,会持续轮询 MQ
-    """
+# ------------------------------- Topic 协程处理核心 -------------------------------
+async def async_handle_topic(topic: str):
     consumer = get_consumer(topic_name=topic, group_id=topic)
-    platform_config = ConfigLoader().get_platform_config(topic)
+    logger = LoggerManager.get_logger(topic, "worker")
+    aliyun_logger = LoggerManager.get_aliyun_logger(topic, "worker")
 
     while True:
         try:
             messages = consumer.consume_message(wait_seconds=10, batch_size=1)
             if not messages:
+                await asyncio.sleep(1)
                 continue
 
             for message in messages:
                 trace_id = generate_trace_id()
-                body = message.message_body
-
                 try:
-                    payload = json.loads(body)
+                    payload = json.loads(message.message_body)
                     platform = payload["platform"]
                     mode = payload["mode"]
                     task_id = payload["id"]
 
-                    # 初始化日志
-                    logger = LoggerManager.get_logger(platform, mode)
-                    aliyun_logger = LoggerManager.get_aliyun_logger(platform, mode)
-                    logger.info(f"[trace_id={trace_id}] 收到任务: {body}")
-
-                    # 初始化配置、用户与规则
                     mysql_service = MysqlService(platform, mode, task_id)
                     user_list = mysql_service.get_user_list()
                     rule_dict = mysql_service.get_rule_dict()
-                    custom_class = platform_config.get("custom_class")
 
-                    # 实例化爬虫类
-                    CrawlerClass = import_custom_class(custom_class) if custom_class else UniversalCrawler
+                    CrawlerClass = get_spider_class(topic)
                     crawler = CrawlerClass(
-                        platform_config=platform_config,
                         rule_dict=rule_dict,
                         user_list=user_list,
                         trace_id=trace_id
                     )
-                    crawler.run()
 
-                    # 爬虫成功,确认消息
+                    await crawler.run()
+
                     ack_message(mode, platform, message, consumer, trace_id=trace_id)
                     aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
 
                 except Exception as e:
                     aliyun_logger.logging(
                         code="9001",
-                        message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
+                        message=f"处理消息失败: {e}\n{traceback.format_exc()}",
                         trace_id=trace_id,
-                        data=body,
+                        data=message.message_body,
                     )
         except Exception as err:
-            logger = LoggerManager.get_logger(topic, mode)
             logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}")
-            time.sleep(5)  # 防止崩溃后频繁拉起
+            await asyncio.sleep(5)
 
 
-def monitor_and_restart(future: Future, topic: str, mode: str, pool: ThreadPoolExecutor, thread_map: Dict[str, Future]):
+async def run_all_topics(topics: List[str]):
     """
-    线程崩溃恢复监控器:线程挂掉后自动重启
+       启动当前进程内所有 topic 的协程任务。
     """
-    try:
-        future.result()  # 获取结果,触发异常
-    except Exception as e:
-        print(f"[监控] 线程 {topic} 异常退出:{e},5秒后尝试重启")
-        time.sleep(5)
-        # 重新提交任务
-        new_future = pool.submit(handle_message, topic, mode)
-        thread_map[topic] = new_future
-        # 注册新的回调
-        new_future.add_done_callback(lambda f: monitor_and_restart(f, topic, mode, pool, thread_map))
+    tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
+    await asyncio.gather(*tasks)
 
 
-def main():
-    topic_list = TopicGroup()
-    print(f"监听 Topics:{topic_list}")
+def handle_topic_group(topics: List[str]):
+    """
+        子进程入口:运行一个事件循环,处理当前分组内的所有 topics。
+    """
+    asyncio.run(run_all_topics(topics))
+
+
+# ------------------------------- 主调度部分 -------------------------------
+def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
+    """
+        将所有 topic 平均分配为 num_groups 组(用于多个进程)。
+    """
+    return [topics[i::num_groups] for i in range(num_groups)]
+
 
-    # 限制最大线程数为 topic 数量
-    pool = ThreadPoolExecutor(max_workers=len(topic_list))
-    thread_map: Dict[str, Future] = {}
+def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, Process]):
+    """
+    启动一个新的子进程来处理一组 topic。
+    """
+    p = Process(target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}")
+    p.start()
+    process_map[group_id] = p
+    print(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
+
+
+def main():
+    # 获取所有已注册的爬虫 topic 列表
+    topic_list = list(SPIDER_CLASS_MAP.keys())
+    print(f"监听 Topics: {topic_list}")
 
-    for topic in topic_list:
-        mode = topic.split("_")[1]
-        future = pool.submit(handle_message, topic, mode)
-        thread_map[topic] = future
+    # 使用 CPU 核心数决定进程数
+    num_cpus = cpu_count()
+    topic_groups = split_topics(topic_list, num_cpus)
+    print(f"CPU 核心数: {num_cpus}, 启动进程数: {len(topic_groups)}")
 
-        # 设置监控器:任务崩溃后自动重启
-        future.add_done_callback(lambda f, t=topic, m=mode: monitor_and_restart(f, t, m, pool, thread_map))
+    # 启动子进程
+    process_map: Dict[int, Process] = {}
+    for group_id, topic_group in enumerate(topic_groups):
+        start_worker_process(group_id, topic_group, process_map)
 
-    # 阻塞主线程防止退出(线程池会维持所有子线程)
+    # 持续监控子进程状态,异常退出自动重启
     try:
         while True:
-            time.sleep(60)
+            time.sleep(5)
+            for group_id, p in list(process_map.items()):
+                if not p.is_alive():
+                    print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,尝试重启中...")
+                    time.sleep(2)
+                    start_worker_process(group_id, topic_groups[group_id], process_map)
     except KeyboardInterrupt:
-        print("接收到退出指令,正在关闭线程池...")
-        pool.shutdown(wait=True)
+        print("接收到退出信号,终止所有进程...")
+        for p in process_map.values():
+            p.terminate()
+        for p in process_map.values():
+            p.join()
 
 
 if __name__ == '__main__':
-    main()
+    main()

+ 2 - 6
scheduler/scheduler_main.py

@@ -1,14 +1,10 @@
 # scheduler_main.py - 爬虫调度主程序
 import asyncio
-import json
-import time
 import traceback
 import sys
 import os
-from crawler_controller import CrawlerController
-from application.common.log import Local
-from application.common import AliyunLogger
-from crawler_worker.universal_crawler import AsyncCrawler
+from application.config.common import AliyunLogger
+from application.spiders.universal_crawler import AsyncCrawler
 
 
 async def main():

+ 0 - 0
test/__init__.py


+ 15 - 0
test/test1.py

@@ -0,0 +1,15 @@
+import asyncio
+import time
+
+
+def f1():
+    for i in range(100,10000):
+        time.sleep(10)
+        print(i)
+async def run():
+    print(1)
+    f1()
+    print(2)
+
+
+asyncio.run(run())

+ 17 - 0
utils/env_loader.py

@@ -0,0 +1,17 @@
+import os
+from dotenv import load_dotenv
+
+# 支持 .env 文件自动加载
+dotenv_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), ".env")
+load_dotenv(dotenv_path)
+
+def get_env(key: str, default: str = "") -> str:
+    """获取环境变量"""
+    return os.getenv(key, default)
+
+def get_int_env(key: str, default: int = 0) -> int:
+    """获取整数类型环境变量"""
+    try:
+        return int(os.getenv(key, default))
+    except ValueError:
+        return default

+ 25 - 2
utils/path_utils.py

@@ -1,10 +1,33 @@
 import os
 
+
 def get_project_path() -> str:
     """
     获取 AutoScraperX 项目根路径
+    支持从任何子模块中调用而不会路径错乱
     """
     return os.path.dirname(os.path.abspath(__file__)).split("AutoScraperX")[0] + "AutoScraperX"
 
-if __name__ == '__main__':
-    print( get_project_path())
+
+# 项目根目录
+project_root = get_project_path()
+
+# 配置路径
+config_dir = os.path.join(project_root, "configs")
+config_spiders_path = os.path.join(config_dir, "spiders_config.yaml")
+
+# 日志路径
+log_dir = os.path.join(project_root, "log_store")
+
+
+# 数据库配置路径(可选)
+# db_config_path = os.path.join(config_dir, "db.yaml")
+
+__all__ = [
+    "project_root",
+    "config_dir",
+    "config_spiders_path",
+    "log_dir",
+    "model_dir",
+    "tmp_dir",
+]

+ 0 - 31
utils/project_paths.py

@@ -1,31 +0,0 @@
-import os
-from utils.path_utils import get_project_path
-
-# 项目根目录
-project_root = get_project_path()
-
-# 配置文件路径
-config_dir = os.path.join(project_root, "configs")
-config_spiders_path = os.path.join(config_dir, "spiders_config.yaml")
-
-# 日志路径(根路径 + log_store)
-log_dir = os.path.join(project_root, "log_store")
-
-# 模型路径(如有)
-model_dir = os.path.join(project_root, "models")
-
-# 临时文件、缓存目录
-tmp_dir = os.path.join(project_root, "tmp")
-
-# 其他路径可按需添加
-# db_config_path = os.path.join(config_dir, "db.yaml")
-
-# 导出路径变量
-__all__ = [
-    "project_root",
-    "config_dir",
-    "config_spiders_path",
-    "log_dir",
-    "model_dir",
-    "tmp_dir",
-]