Ver Fonte

first commit

zhangliang há 2 semanas atrás
pai
commit
86f01a6b8f

+ 1 - 1
README.md

@@ -35,7 +35,7 @@ AutoScraperX/
 
 ## 🚀 功能特性
 
-- ✅ 多 Topic 单进程并发监听消费(使用线程+异步
+- ✅ 多 Topic 单进程并发监听消费(使用线程)
 - ✅ 根据消息动态获取 platform/mode,并注入 user_list、rule_dict
 - ✅ YAML 驱动爬虫逻辑,无需重复开发代码
 - ✅ 请求支持自动重试、动态分页、字段抽取

+ 5 - 3
application/common/log/local_log.py

@@ -2,6 +2,7 @@ import sys
 from datetime import date, timedelta, datetime
 from loguru import logger
 from pathlib import Path
+from utils.project_paths import log_dir
 
 class Local:
     # 日期常量
@@ -11,7 +12,7 @@ class Local:
     tomorrow = (today + timedelta(days=1)).strftime("%Y-%m-%d")
 
     @staticmethod
-    def init_logger(platform: str, mode: str = "prod", log_level: str = "INFO", log_to_console: bool = False,
+    def init_logger(platform: str, mode: str, log_level: str = "INFO", log_to_console: bool = False,
                     rotation: str = "00:00", retention: str = "10 days"):
         """
         初始化日志记录器
@@ -22,9 +23,10 @@ class Local:
         :param rotation: 日志文件切分策略(默认每天 00:00)
         :param retention: 日志保留时间(默认10天)
         """
+
         # 创建日志目录
-        log_dir = Path(f"./log_store/{platform}")
-        log_dir.mkdir(parents=True, exist_ok=True)
+        log_path = Path(f"{log_dir}/{platform}")
+        log_path.mkdir(parents=True, exist_ok=True)
 
         # 设置日志文件名
         log_filename = f"{platform}-{mode}-{Local.today.strftime('%Y-%m-%d')}.log"

+ 5 - 2
application/common/messageQueue/ack_message.py

@@ -1,3 +1,6 @@
+from application.common import Local
+
+
 def ack_message(mode, platform, recv_msgs, consumer, trace_id=None):
     """
     消费成功后确认消息
@@ -5,11 +8,11 @@ def ack_message(mode, platform, recv_msgs, consumer, trace_id=None):
     try:
         receipt_handle_list = [recv_msgs.receipt_handle]
         consumer.ack_message(receipt_handle_list)
-        Local.logger(platform, mode).info(
+        Local.init_logger(platform, mode).info(
             f"[trace_id={trace_id}] Ack {len(receipt_handle_list)} Message Succeed."
         )
 
     except MQExceptionBase as err:
-        Local.logger(platform, mode).error(
+        Local.init_logger(platform, mode).error(
             f"[trace_id={trace_id}] Ack Message Fail! Exception:{err}"
         )

+ 1 - 54
application/common/mysql/mysql_helper.py

@@ -1,6 +1,3 @@
-# -*- coding: utf-8 -*-
-# @Author: luojunhui
-# @Time: 2023/12/19
 """
 数据库连接及操作
 """
@@ -69,54 +66,4 @@ class MysqlHelper(object):
         """
         关闭连接
         """
-        self.connection.close()
-
-
-
-class RedisHelper:
-    @classmethod
-    def connect_redis(cls, env):
-        if env == 'hk':
-            redis_pool = redis.ConnectionPool(
-                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 测试地址
-                host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
-                port=6379,
-                db=2,
-                password='Wqsd@2019'
-            )
-            redis_conn = redis.Redis(connection_pool=redis_pool)
-        elif env == 'prod':
-            redis_pool = redis.ConnectionPool(
-                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
-                # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
-                port=6379,
-                db=2,
-                password='Wqsd@2019'
-            )
-            redis_conn = redis.Redis(connection_pool=redis_pool)
-        else:
-            redis_pool = redis.ConnectionPool(
-                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
-                port=6379,
-                db=2,
-                password='Qingqu2019'
-            )
-            redis_conn = redis.Redis(connection_pool=redis_pool)
-        return redis_conn
-
-    @classmethod
-    def redis_push(cls, env, task_key, data):
-        redis_conn = cls.connect_redis(env)
-        # print("开始写入数据")
-        redis_conn.lpush(task_key, data)
-        # print("数据写入完成")
-
-    @classmethod
-    def redis_pop(cls, env, task_key):
-        redis_conn = cls.connect_redis(env)
-        if redis_conn.llen(task_key) == 0:
-            return None
-        else:
-            return redis_conn.rpop(task_key)
+        self.connection.close()

+ 53 - 63
application/common/redis/redis_helper.py

@@ -1,67 +1,57 @@
 import redis
-from datetime import timedelta
-
-
-class SyncRedisHelper:
-    _pool: redis.ConnectionPool = None
-    _instance = None
-
-    def __init__(self):
-        if not self._instance:
-            self._pool = self._get_pool()
-            self._instance = self
-
-    def _get_pool(self) -> redis.ConnectionPool:
-        if self._pool is None:
-            self._pool = redis.ConnectionPool(
-                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",  # 内网地址
-                # host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com",  # 外网地址
+import pymysql
+import os
+import sys
+
+sys.path.append(os.getcwd())
+
+from application.common.log import Local
+from application.config.mysql_config import env_dict
+
+class RedisHelper:
+    @classmethod
+    def connect_redis(cls, env):
+        if env == 'hk':
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 测试地址
+                host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
                 port=6379,
                 db=2,
-                password="Wqsd@2019",
-                # password="Qingqu2019",
-
+                password='Wqsd@2019'
             )
-        return self._pool
-
-    def get_client(self) -> redis.Redis:
-        pool = self._get_pool()
-        client = redis.Redis(connection_pool=pool)
-        return client
-
-    def close(self):
-        if self._pool:
-            self._pool.disconnect(inuse_connections=True)
-
-
-def store_data(platform, out_video_id, condition, day_time):
-    key = f"crawler:duplicate:{platform}:{out_video_id}"
-    value = 1
-    if condition:
-        timeout = timedelta(days=int(day_time))
-    else:
-        timeout = timedelta(hours=int(day_time))
-    helper = SyncRedisHelper()
-    client = helper.get_client()
-
-    client.set(key, value)
-    client.expire(key, timeout)
-
-
-def get_data(platform, out_video_id):
-    key = f"crawler:duplicate:{platform}:{out_video_id}"
-    helper = SyncRedisHelper()
-    client = helper.get_client()
-    value = client.exists(key)
-    return value
-
-
-# 示例:存储一个数据
-# store_data('xiaoniangao', '123457', True, 60)
-
-# # 示例:获取一个数据
-# value = get_data('xiaoniangao', '1234857')
-# if value is None:
-#     print("Value does not exist")
-# else:
-#     print(f"Retrieved value: {value}")
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        elif env == 'prod':
+            redis_pool = redis.ConnectionPool(
+                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Wqsd@2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        else:
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Qingqu2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        return redis_conn
+
+    @classmethod
+    def redis_push(cls, env, task_key, data):
+        redis_conn = cls.connect_redis(env)
+        # print("开始写入数据")
+        redis_conn.lpush(task_key, data)
+        # print("数据写入完成")
+
+    @classmethod
+    def redis_pop(cls, env, task_key):
+        redis_conn = cls.connect_redis(env)
+        if redis_conn.llen(task_key) == 0:
+            return None
+        else:
+            return redis_conn.rpop(task_key)

+ 3 - 1
application/config/topic_group_queue.py

@@ -20,5 +20,7 @@ class TopicGroup:
 
 if __name__ == '__main__':
     tg = TopicGroup()
-    print(tg)
+    for i in tg:
+        mmode = i.split("_")[1]
+        print(mmode)
 

+ 0 - 26
application/functions/appium_tools.py

@@ -1,26 +0,0 @@
-"""
-Appium 的一些公共方法
-"""
-import time
-from selenium.webdriver.common.by import By
-from selenium.common.exceptions import NoSuchElementException
-
-
-def search_elements(driver, xpath):
-    """
-    获取元素
-    :param driver:
-    :param xpath:
-    :return:
-    """
-    time.sleep(1)
-    windowHandles = driver.window_handles
-    for handle in windowHandles:
-        driver.switch_to.window(handle)
-        time.sleep(1)
-        try:
-            elements = driver.find_elements(By.XPATH, xpath)
-            if elements:
-                return elements
-        except NoSuchElementException:
-            pass

+ 0 - 3
application/functions/crypt.py

@@ -1,3 +0,0 @@
-"""
-爬虫逆向加密算法
-"""

+ 147 - 0
application/functions/mysql_service.py

@@ -0,0 +1,147 @@
+import json
+import traceback
+
+from application.common import MysqlHelper, AliyunLogger,Local
+
+
+class MysqlService:
+    def __init__(self, task_id, mode, platform):
+        self.env = "prod"
+        self.task_id = task_id
+        self.mode = mode
+        self.platform = platform
+        self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
+        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
+        self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
+        Local.init_logger(self.platform,self.mode)
+
+
+    def get_rule_dict(self):
+        """
+        :return: 返回任务的规则, task_rule
+        """
+        rule_dict = {}
+        task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
+        data = self.MySQL.select(task_rule_sql)
+        if data:
+            rule_list = json.loads(data[0][0])
+            for item in rule_list:
+                for key in item:
+                    rule_dict[key] = item[key]
+        self.aliyun_log.logging(
+            code=1000,
+            message="抓取规则",
+            data=rule_dict
+        )
+        return rule_dict
+
+
+    def get_user_list(self):
+        """
+        :return: 返回用户列表
+        """
+        task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
+        uid_list = self.MySQL.select(task_user_list_sql)
+        user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
+        self.aliyun_log.logging(
+            code=1000,
+            message="用户列表",
+            data=user_list
+        )
+        return user_list
+
+    def check_user_id(self, uid):
+        """
+        检查指定用户ID是否存在于数据库的zqkd_uid表中。
+
+        :param uid:要检查的用户ID
+        :return:如果用户ID存在于表中返回True,否则返回False
+        """
+        try:
+            query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """
+            result = self.mysql.select(sql=query_sql)
+            return bool(result)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
+            return False
+
+    def update_user(self, uid, user_name, avatar_url):
+        """
+        更新数据库中指定用户的用户名和头像URL。
+
+        :param uid:要更新信息的用户ID
+        :param user_name:新的用户名
+        :param avatar_url:新的头像URL
+        :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
+        """
+        try:
+            update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
+            return self.MySQL.update(sql=update_sql)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.logger.error(f"更新用户信息失败: {e}\n{tb}")
+            return None
+
+    def insert_user(self, uid, user_name, avatar_url):
+        """
+        向数据库的zqkd_user表中插入或更新用户信息
+
+        :param uid: 用户ID(数值类型)
+        :param user_name: 用户名
+        :param avatar_url: 头像URL
+        :return: 成功返回影响的行数,失败返回None
+        """
+        try:
+            # 直接拼接SQL(不推荐,有SQL注入风险)
+            insert_sql = f"""
+                  INSERT INTO zqkd_user (uid, avatar_url, user_name) 
+                  VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}') 
+                  ON DUPLICATE KEY UPDATE 
+                  user_name = '{user_name.replace("'", "''")}', 
+                  avatar_url = '{avatar_url.replace("'", "''")}'
+              """
+            return self.MySQL.update(sql=insert_sql)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.logger.error(f"插入用户信息失败: {e}\n{tb}")
+            return None
+
+    def get_today_videos(self):
+        try:
+            # 手动转义单引号(仅缓解部分风险)
+
+            sql = """
+                          SELECT count(*) as cnt
+                          FROM crawler_video 
+                          WHERE create_time >= CURDATE() 
+                            AND create_time < CURDATE() + INTERVAL 1 DAY 
+                            AND platform = %s 
+                            AND strategy = %s
+                      """
+            result = self.MySQL.select_params(sql, (self.platform, self.mode))
+            if result and len(result) > 0:
+                return result[0][0]  # 返回第一行第一列的计数值
+            return 0  # 无结果时返回0
+        except Exception as e:
+            self.logger.error(f"查询失败: {e}")
+            return 0
+
+    def select_user(self, last_scanned_id=0):
+        """
+        根据last_scanned_id查询用户数据
+        :param last_scanned_id: 上次扫描的ID,0表示从头开始
+        :return: 查询结果列表
+        """
+        try:
+            # 构建查询(根据last_scanned_id过滤)
+            query = "SELECT id, uid FROM zqkd_user"
+            if last_scanned_id > 0:
+                query += f" WHERE id > {last_scanned_id}"
+            query += " ORDER BY id ASC"
+
+            return self.MySQL.select(query)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.logger.error(f"查询用户列表失败: {e}\n{tb}")
+            return []

+ 41 - 60
configs/codes.py

@@ -1,61 +1,42 @@
-# crawler_status/codes.py
-
-# 成功
-SUCCESS = "1000"
-
-# 参数配置错误
-CONFIG_MISSING = "2000"
-PARAM_REQUIRED = "2001"
-UNSUPPORTED_TYPE = "2002"
-URL_JOIN_FAILED = "2003"
-CUSTOM_CLASS_IMPORT_FAILED = "2004"
-CONFIG_LOAD_FAILED = "2005"
-
-# 抓取错误
-FETCH_EXCEPTION = "3000"
-FETCH_EMPTY = "3001"
-HTTP_ERROR = "3002"
-TIMEOUT = "3003"
-INVALID_FORMAT = "3004"
-BLOCKED = "3005"
-REDIRECT_ERROR = "3006"
-
-# 解析处理
-JSONPATH_FAIL = "3100"
-XPATH_FAIL = "3101"
-FIELD_MAP_ERROR = "3102"
-PARSE_EMPTY = "3103"
-FORMAT_INVALID = "3104"
-
-# 清洗转化
-CLEAN_MISMATCH = "3200"
-TRANSFORM_FAIL = "3201"
-MISSING_REQUIRED_FIELD = "3202"
-
-# 数据写入
-DB_WRITE_FAIL = "4000"
-DB_DUPLICATE = "4001"
-DB_CONN_FAIL = "4002"
-FILE_WRITE_FAIL = "4003"
-
-# ETL
-ETL_IMPORT_FAIL = "4100"
-ETL_RUN_FAIL = "4101"
-ETL_UNKNOWN_ERROR = "4102"
-
-# 系统
-UNKNOWN_ERROR = "5000"
-IMPORT_ERROR = "5001"
-DYNAMIC_LOAD_ERROR = "5002"
-FILE_NOT_FOUND = "5003"
-
-# 业务
-DATA_EXISTS = "6000"
-NO_UPDATE = "6001"
-FILTERED = "6002"
-
-# 重试
-RETRY = "7000"
-RETRY_MAX = "7001"
-
 
+# === 配置相关 ===
+CONFIG_LOAD_SUCCESS = "1000"
+CONFIG_MISSING_FIELD = "1001"
+CONFIG_FORMAT_ERROR = "1002"
+
+# === 启动流程 ===
+CRAWLER_START = "1100"
+CRAWLER_STOP_BY_LIMIT = "1101"
+
+# === 请求发送 ===
+REQUEST_SEND_SUCCESS = "1200"
+REQUEST_SEND_FAIL = "1201"
+REQUEST_TIMEOUT = "1202"
+REQUEST_RESPONSE_INVALID = "1203"
+
+# === 数据解析 ===
+DATA_PARSE_SUCCESS = "1300"
+DATA_PARSE_FAIL = "1301"
+DATA_FIELD_MISSING = "1302"
+
+# === 视频处理 ===
+VIDEO_PROCESS_SUCCESS = "1400"
+VIDEO_DURATION_INVALID = "1401"
+VIDEO_FIELD_ERROR = "1402"
+VIDEO_USER_CNT_INVALID = "1403"
+
+# === 管道处理 ===
+PIPELINE_SUCCESS = "1500"
+PIPELINE_FAIL = "1501"
+PIPELINE_PUSH_MQ = "1502"
+
+# === 下载限制判断 ===
+LIMIT_REACHED = "1600"
+LIMIT_RULE_CHECK_FAIL = "1601"
+
+# === 后置操作 ===
+POST_ACTION_TRIGGERED = "1700"
+POST_ACTION_FAILED = "1701"
+
+# === 兜底异常 ===
+SYSTEM_UNEXPECTED_ERROR = "9999"

+ 33 - 49
configs/messages.py

@@ -1,52 +1,36 @@
-# crawler_status/messages.py
-
-from .codes import *
+# messages.py
 
 MESSAGES = {
-    SUCCESS: "成功",
-    CONFIG_MISSING: "配置缺失或无效",
-    PARAM_REQUIRED: "缺少必要参数",
-    UNSUPPORTED_TYPE: "不支持的爬虫类型",
-    URL_JOIN_FAILED: "URL 拼接失败",
-    CUSTOM_CLASS_IMPORT_FAILED: "自定义类加载失败",
-    CONFIG_LOAD_FAILED: "配置文件读取失败",
-
-    FETCH_EXCEPTION: "抓取单条视频失败,请求异常",
-    FETCH_EMPTY: "抓取返回空数据",
-    HTTP_ERROR: "HTTP 状态码异常",
-    TIMEOUT: "请求超时",
-    INVALID_FORMAT: "无效的响应格式",
-    BLOCKED: "被目标站封禁或滑块验证",
-    REDIRECT_ERROR: "请求被重定向异常",
-
-    JSONPATH_FAIL: "JSONPath 提取失败",
-    XPATH_FAIL: "HTML XPath 提取失败",
-    FIELD_MAP_ERROR: "字段映射缺失或类型错误",
-    PARSE_EMPTY: "解析后结果为空",
-    FORMAT_INVALID: "数据格式校验失败",
-
-    CLEAN_MISMATCH: "清洗规则不匹配",
-    TRANSFORM_FAIL: "数据转化失败",
-    MISSING_REQUIRED_FIELD: "字段缺失导致中断",
-
-    DB_WRITE_FAIL: "写入数据库失败",
-    DB_DUPLICATE: "主键冲突或重复数据",
-    DB_CONN_FAIL: "数据库连接失败",
-    FILE_WRITE_FAIL: "写入本地文件失败",
-
-    ETL_IMPORT_FAIL: "ETL 模块导入失败",
-    ETL_RUN_FAIL: "process_video_obj 执行失败",
-    ETL_UNKNOWN_ERROR: "ETL 处理逻辑异常",
-
-    UNKNOWN_ERROR: "未知系统错误",
-    IMPORT_ERROR: "模块导入错误",
-    DYNAMIC_LOAD_ERROR: "动态类加载失败",
-    FILE_NOT_FOUND: "路径错误或文件不存在",
-
-    DATA_EXISTS: "视频内容已存在,跳过",
-    NO_UPDATE: "当前无更新内容",
-    FILTERED: "需人工校验的内容被过滤",
-
-    RETRY: "触发重试机制",
-    RETRY_MAX: "最大重试次数已达,终止任务",
+    "1000": "配置加载成功",
+    "1001": "配置缺失必要字段",
+    "1002": "配置格式错误",
+
+    "1100": "开始执行爬虫任务",
+    "1101": "任务终止:达到下载上限",
+
+    "1200": "请求发送成功",
+    "1201": "请求发送失败",
+    "1202": "请求超时",
+    "1203": "接口响应格式异常或返回非0",
+
+    "1300": "数据解析成功",
+    "1301": "数据解析失败",
+    "1302": "关键字段缺失",
+
+    "1400": "视频处理成功",
+    "1401": "视频时长不符合要求",
+    "1402": "视频字段提取失败",
+    "1403": "用户视频数量不满足要求",
+
+    "1500": "数据管道处理成功",
+    "1501": "数据管道处理失败",
+    "1502": "已推送至消息队列",
+
+    "1600": "达到下载数量限制",
+    "1601": "下载规则校验失败",
+
+    "1700": "已触发后置操作",
+    "1701": "后置操作执行失败",
+
+    "9999": "系统内部未知错误"
 }

+ 33 - 3
configs/spiders_config.yaml

@@ -3,14 +3,38 @@ default:
   request_timeout: 30
   headers:
     {"Content-Type": "application/json"}
-benshanzhufu:
+
+bszf_recommend_prod:
+  platform: benshanzhufu
   mode: recommend
   path: /crawler/ben_shan_zhu_fu/recommend
   method: post
   request_body:
     cursor: "1"
-  paging: true
-  max_pages: 5
+  loop_times: 2
+  loop_interval: 5
+  response_parse:
+    next_cursor: "$.data.next_cursor"
+    data_path: "$.data.data"
+    fields:
+      video_id: "$.nid"
+      video_title: "$.title"
+      play_cnt: 0
+      publish_time_stamp: "$.update_time"
+      out_user_id: "$.nid"
+      cover_url: "$.video_cover"
+      like_cnt: 0
+      video_url: "$.video_url"
+      out_video_id: "$.nid"
+
+xngtjl_recommend_prod:
+  platform: xiaoniangaotuijianliu
+  mode: recommend
+  path: /crawler/ben_shan_zhu_fu/recommend
+  method: post
+  request_body:
+    cursor: "1"
+  loop_times: 2
   etl_hook: "process_video_obj"
   response_parse:
     next_cursor: "$.data.next_cursor"
@@ -25,6 +49,12 @@ benshanzhufu:
       like_cnt: 0
       video_url: "$.video_url"
       out_video_id: "$.nid"
+  post_actions:
+    - trigger: after_video_processed
+      endpoint: "http://example.com/notify"
+      payload:
+
+
 
 
 zhongqingkandian:

+ 1 - 2
configs/topic_map.yaml

@@ -1,3 +1,2 @@
 topics:
-  - bszf_recommend_prod
-  - zqkd_recommend_prod
+  - bszf_recommend_prod

+ 331 - 86
crawler_worker/universal_crawler.py

@@ -6,11 +6,11 @@ import time
 import uuid
 import yaml
 import requests
-
+import cv2
 from datetime import datetime
-from typing import Dict, Any, List, Optional
-from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
-from utils.extractors import safe_extract,extract_multiple
+from typing import Dict, Any, List, Optional, Union
+from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
+from utils.extractors import safe_extract, extract_multiple
 
 # 添加公共模块路径
 sys.path.append(os.getcwd())
@@ -28,40 +28,80 @@ from application.common.log import Local
 from configs.config import base_url
 
 
+def before_send_log(retry_state: RetryCallState) -> None:
+    """请求重试前记录日志"""
+    attempt = retry_state.attempt_number
+    last_result = retry_state.outcome
+    if last_result.failed:
+        exc = last_result.exception()
+        logger = retry_state.kwargs.get('logger')
+        url = retry_state.args[0] if retry_state.args else "unknown"
+        if logger:
+            logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
+
+
 class UniversalCrawler:
     """通用爬虫类,通过YAML配置驱动不同平台的爬取逻辑"""
 
-    def __init__(self, platform: str, mode: str, rule_dict: Dict, user_list: List, env: str = "prod"):
+    def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
         """
         初始化爬虫
-        :param platform: 平台名称(对应YAML文件名)
+        :param platform_config: 平台配置字典
+        :param rule_dict: 规则字典
+        :param user_list: 用户列表
+        :param trace_id: 追踪ID
         :param env: 运行环境
         """
-        self.platform = platform
-        self.mode = mode
+        self.platform = platform_config["platform"]
+        self.mode = platform_config["mode"]
         self.rule_dict = rule_dict
         self.user_list = user_list
+        self.trace_id = trace_id
         self.env = env
-        self.config_path = "/Users/zhangliang/Documents/piaoquan/AutoScraperX/configs/spiders_config.yaml"
-        self.config = ConfigLoader().get_platform_config(self.platform)
-        self.aliyun_log = AliyunLogger(platform=platform, mode=self.config["mode"])
-        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
-        # self.mysql = MysqlHelper(mode=self.config["mode"], platform=platform)
+        self.config = platform_config
+        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
         self.logger = Local.init_logger(platform=self.platform, mode=self.mode, log_level="INFO", log_to_console=True)
-        self.download_cnt = 0
-        self.limit_flag = False
+        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
+
+        self.has_enough_videos = False
         self.base_api = base_url
+        self.loop_times = platform_config.get('loop_times', 1)
+
+        # 提取常用配置为类属性,提高可读性
+        self.request_method = self.config["method"].upper()
+        self.request_url = self.config["url"]
+        self.request_headers = self.config.get("headers", {})
+        self.request_body = self.config.get("request_body", {})
+        self.response_data_path = self.config["response_parse"]["data_path"]
+        self.video_fields_map = self.config["response_parse"]["fields"]
+
+        # 下载限制配置
+        self.download_min_limit = self.config.get("download_limit", {}).get("min", 200)
 
     @retry(
-        stop=stop_after_attempt(3),  # 最多重试 3 次
-        wait=wait_fixed(2),  # 每次重试间隔 2 秒
-        retry=retry_if_exception_type((requests.RequestException, ValueError))
+        stop=stop_after_attempt(3),  # 最多重试3次
+        wait=wait_fixed(2),  # 每次重试间隔2秒
+        retry=retry_if_exception_type((requests.RequestException, ValueError)),
+        before=before_send_log,  # 添加重试前日志
     )
-    def _send_request(self, method: str, url: str, headers, payload, timeout = 30) -> Optional[
-        Dict]:
-        """发送API请求,失败自动重试最多3次"""
+    def _send_request(self, url: str, method: str = None, headers: Dict = None,
+                      payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
+        """
+        发送API请求,失败自动重试最多3次
+        :param url: 请求URL
+        :param method: 请求方法,默认使用配置中的方法
+        :param headers: 请求头,默认使用配置中的头
+        :param payload: 请求体,默认使用配置中的体
+        :param timeout: 超时时间
+        :return: 响应JSON数据或None
+        """
+        # 使用默认配置(如果未提供参数)
+        method = method or self.request_method
+        headers = headers or self.request_headers
+        payload = payload or self.request_body
 
         try:
+            self.logger.info(f"{self.trace_id}--正在发送请求: {url}")
             response = requests.request(
                 method=method,
                 url=url,
@@ -71,33 +111,179 @@ class UniversalCrawler:
             )
             response.raise_for_status()
             resp = response.json()
-            if resp["code"] == 0:
-                return response.json()
-            raise ValueError(f"接口响应非0:{resp}")
+            if resp.get("code") == 0:
+                return resp
+            self.logger.warning(f"{self.trace_id}--API响应非零状态码: {resp}")
+            raise ValueError(f"API响应错误: {resp}")
+        except requests.exceptions.Timeout:
+            self.logger.error(f"{self.trace_id}--请求超时: {url}")
+            raise
+        except requests.exceptions.RequestException as e:
+            self.logger.error(f"{self.trace_id}--请求异常: {e}")
+            raise
+        except json.JSONDecodeError as e:
+            self.logger.error(f"{self.trace_id}--解析JSON响应失败: {e}")
+            raise
         except Exception as e:
-            # 在最后一次失败时才记录日志
+            # 在最后一次失败时记录详细日志
             self.aliyun_log.logging(
                 code="3000",
                 message=f"请求失败: {url}",
-                data={"error": str(e)}
+                data={"error": str(e)},
+                trace_id=self.trace_id
             )
-            return
+            self.logger.error(f"{self.trace_id}--意外错误: {e}")
+            raise
+
+    def get_video_duration(self, video_url: str, timeout: int = 20) -> float:
+        """
+        获取网络视频的时长(秒),增加网络异常处理和超时控制
+        :param video_url: 视频URL
+        :param timeout: 超时时间
+        :return: 视频时长(秒),失败时返回0
+        """
+        # 检查URL是否可访问
+        try:
+            response = requests.head(video_url, timeout=timeout)
+            response.raise_for_status()  # 检查HTTP状态码
+        except requests.exceptions.RequestException as e:
+            self.logger.error(f"{self.trace_id}--网络错误: 无法访问视频URL - {e}")
+            return 0
+
+        cap = None
+        try:
+            # 创建VideoCapture对象
+            cap = cv2.VideoCapture(video_url)
+
+            # 设置缓冲区大小,减少延迟
+            cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
+
+            # 尝试打开视频流(最多尝试3次)
+            max_attempts = 3
+            for attempt in range(max_attempts):
+                if cap.isOpened():
+                    break
+                self.logger.info(f"{self.trace_id}--尝试打开视频流 ({attempt + 1}/{max_attempts})...")
+                time.sleep(1)
+
+            if not cap.isOpened():
+                self.logger.error(f"{self.trace_id}--错误: 无法打开视频流 {video_url}")
+                return 0
+
+            # 获取视频属性
+            fps = cap.get(cv2.CAP_PROP_FPS)
+            frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
+
+            if fps <= 0 or frame_count <= 0:
+                # 某些网络视频可能无法直接获取总帧数,尝试读取几帧估算
+                self.logger.info(f"{self.trace_id}--无法获取总帧数,尝试估算...")
+                frame_count = 0
+                start_time = time.time()
+
+                # 读取10帧估算帧率和时长
+                for _ in range(10):
+                    ret, frame = cap.read()
+                    if not ret:
+                        break
+                    frame_count += 1
+
+                elapsed_time = time.time() - start_time
+                if elapsed_time > 0:
+                    estimated_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
+                    # 假设视频时长为30秒(可根据实际情况调整)
+                    estimated_duration = 30.0
+                    self.logger.info(f"{self.trace_id}--估算视频时长: {estimated_duration}秒 (基于{frame_count}帧)")
+                    return estimated_duration
+                else:
+                    self.logger.error(f"{self.trace_id}--错误: 无法估算视频时长")
+                    return 0
+
+            duration = frame_count / fps
+            self.logger.info(f"{self.trace_id}--视频时长: {duration}秒")
+            return duration
+
+        except Exception as e:
+            self.logger.error(f"{self.trace_id}--获取视频时长时发生异常: {e}")
+            return 0
+
+        finally:
+            if cap:
+                cap.release()  # 确保资源释放
+
+    def _extract_video_list(self, response: Dict) -> List[Dict]:
+        """从API响应中提取视频列表"""
+        return safe_extract(response, self.response_data_path) or []
+
+    def _is_video_valid(self, video_data: Dict) -> bool:
+        """
+        判断视频是否满足条件
+        :param video_data: 视频数据
+        :return: True if valid, False otherwise
+        """
+        if not self.rule_dict:
+            return True
+
+        # 检查视频时长规则
+        rule_duration = self.rule_dict.get("duration")
+        if rule_duration:
+            extract_video_url_path = self.video_fields_map.get("video_url")
+            if not extract_video_url_path:
+                self.logger.warning(f"{self.trace_id}--缺少视频URL字段映射")
+                return False
+
+            video_url = safe_extract(video_data, extract_video_url_path)
+            if not video_url:
+                self.logger.warning(f"{self.trace_id}--无法提取视频URL")
+                return False
+
+            video_duration = self.get_video_duration(video_url)
+            min_duration = rule_duration.get("min", 0)
+            max_duration = rule_duration.get("max", float('inf'))
+
+            if not (min_duration <= video_duration <= max_duration):
+                self.logger.info(f"{self.trace_id}--视频时长{video_duration}秒超出范围[{min_duration}, {max_duration}]")
+                return False
+
+        # 检查视频数量规则
+        rule_videos_cnt = self.rule_dict.get("videos_cnt")
+        if rule_videos_cnt:
+            # 这里应该查询数据库获取实际视频数量
+            # 示例代码,实际实现需要根据业务逻辑完善
+            video_count = self._get_video_count_from_db()  # 假设这是获取视频数量的方法
+            min_count = rule_videos_cnt.get("min", 0)
+            if video_count >= min_count:
+                self.logger.info(f"{self.trace_id}--视频数量{video_count}达到最小要求{min_count}")
+                return False
+
+        return True
+
+    def _get_video_count_from_db(self) -> int:
+        """从数据库获取视频数量(示例方法,需根据实际业务实现)"""
+        # 实际实现中应该查询数据库
+        return 0  # 占位符
 
     def _process_video(self, video_data: Dict) -> bool:
-        """处理单个视频数据"""
-        # 从配置中获取字段映射
-        field_map = self.config["response_parse"]["fields"]
+        """
+        处理单个视频数据
+        :param video_data: 视频数据
+        :return: 处理成功返回True,失败返回False
+        """
+        # 先判断视频是否符合条件
+        if not self._is_video_valid(video_data):
+            self.logger.info(f"{self.trace_id}--视频因验证不通过被跳过")
+            return False
 
         # 创建视频项
         item = VideoItem()
-        for field_name, path in field_map.items():
-            if isinstance(path, str) and path.startswith("$."):
 
-                match = safe_extract(video_data,path)
+        # 从配置中获取字段映射并填充数据
+        for field_name, path in self.video_fields_map.items():
+            if isinstance(path, str) and path.startswith("$."):
+                match = safe_extract(video_data, path)
                 item.add_video_info(field_name, match)
             else:
                 # 如果是固定值(int、str等),直接使用
-                item.add_video_info(field_name,path)
+                item.add_video_info(field_name, path)
 
         # 添加固定字段
         item.add_video_info("platform", self.platform)
@@ -109,7 +295,8 @@ class UniversalCrawler:
         item.add_video_info("user_id", our_user["uid"])
         item.add_video_info("user_name", our_user["nick_name"])
 
-        print(item)
+        video_title = item.get("title", "未知标题")
+        self.logger.info(f"{self.trace_id}--正在处理视频: {video_title}")
 
         # 处理管道
         trace_id = f"{self.platform}-{uuid.uuid4()}"
@@ -128,78 +315,136 @@ class UniversalCrawler:
             self.aliyun_log.logging(
                 code="1002",
                 message="成功发送至ETL",
-                data=item.produce_item()
+                data=item.produce_item(),
+                trace_id=self.trace_id
             )
+            self.logger.info(f"{self.trace_id}--视频处理完成并发送至消息队列,已处理总数: {self.download_cnt}")
 
             # 检查下载限制
-            min_limit = self.config.get("download_limit", {}).get("min", 200)
-            if self.download_cnt >= min_limit:
-                self.limit_flag = True
+            if self.download_cnt >= self.download_min_limit:
+                self.has_enough_videos = True
                 self.aliyun_log.logging(
                     code="2000",
-                    message=f"达到下载限制: {min_limit}",
+                    message=f"达到下载限制: {self.download_min_limit}",
+                    trace_id=self.trace_id
                 )
+                self.logger.info(f"{self.trace_id}--达到下载限制,停止进一步处理")
             return True
+
+        self.logger.warning(f"{self.trace_id}--通过管道处理视频失败")
         return False
 
+    def _fetch_video_list(self) -> Optional[List[Dict]]:
+        """
+        获取并解析视频列表
+        :return: 视频列表或None
+        """
+        self.logger.info(f"{self.trace_id}--从{self.request_url}获取视频列表")
+        response = self._send_request(
+            self.request_url,
+            self.request_method,
+            self.request_headers,
+            self.request_body
+        )
 
-    # --------------------- 自定义处理函数 ---------------------
-    def _func_current_timestamp(self, _) -> int:
-        """获取当前时间戳"""
-        return int(time.time())
+        if not response:
+            self.logger.error(f"{self.trace_id}--获取视频列表失败")
+            return None
 
-    def _func_formatted_time(self, _) -> str:
-        """获取格式化时间"""
-        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        video_list = self._extract_video_list(response)
+        self.logger.info(f"{self.trace_id}--获取到{len(video_list)}个视频")
+        return video_list
 
-    def _func_random_delay(self, _) -> None:
-        """随机延迟"""
-        min_delay = self.config.get("delay", {}).get("min", 3)
-        max_delay = self.config.get("delay", {}).get("max", 8)
-        time.sleep(random.randint(min_delay, max_delay))
-        return None
+    def _execute_post_actions(self):
+        """执行爬取后的额外操作(如曝光上报)"""
+        for action in self.config.get("post_actions", []):
+            if action.get("trigger") == "after_video_processed":
+                endpoint = action.get("endpoint")
+                payload = action.get("payload", {})
+                if endpoint:
+                    self.logger.info(f"{self.trace_id}--执行后置操作: {endpoint}")
+                    self._send_request(endpoint, payload=payload)
 
     def run(self):
         """执行爬取任务"""
-        self.logger.info(f"开始执行爬虫{self.platform}")
-
-        while not self.limit_flag:
-            # 获取初始列表数据
-            initial_data = self._send_request(
-                self.config["method"].upper(),
-                self.config["url"],
-                self.config.get("headers", {}),
-                self.config.get("request_body", {})
-            )
-            print(initial_data)
-
-            if not initial_data:
-                return
-            video_objs = safe_extract(initial_data,self.config["response_parse"]["data_path"])
-            self.logger.info(f"获取到的视频列表:{json.dumps(video_objs)}")
-
-            next_cursor = None
-            # 处理视频列表
-            video_list = safe_extract(
-                initial_data,
-                self.config["response_parse"]["data_path"]
-            )
+        self.aliyun_log.logging(
+            code=1003,
+            message="开始执行爬虫",
+            data=self.platform,
+            trace_id=self.trace_id
+        )
+        self.logger.info(f"{self.trace_id}--开始{self.platform}执行爬虫")
+
+        for loop in range(self.loop_times):
+            if self.has_enough_videos:
+                self.aliyun_log.logging(
+                    code=2000,
+                    message=f"[{self.platform}] 达到每日最大爬取量",
+                    data=self.platform,
+                    trace_id=self.trace_id
+                )
+                self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止爬虫")
+                break
+
+            self.logger.info(f"{self.trace_id}--开始第{loop + 1}/{self.loop_times}轮循环")
+            video_list = self._fetch_video_list()
+
+            if not video_list:
+                self.logger.warning(f"{self.trace_id}--视频列表为空,跳过本轮循环")
+                continue
 
             for video_data in video_list:
-                self.logger.info(f"视频对象{video_data}")
-                if self.limit_flag:
+                if self.has_enough_videos:
+                    self.logger.info(f"{self.trace_id}--达到每日最大爬取量,停止处理")
                     break
+
                 self._process_video(video_data)
 
-                # 执行额外操作(如曝光上报)
-                for action in self.config.get("post_actions", []):
-                    if action["trigger"] == "after_video_processed":
-                        self._send_request(action["endpoint"], action.get("payload", {}))
+            # 执行额外操作(如曝光上报)
+            self._execute_post_actions()
+
+            # 添加循环间隔
+            loop_interval = self.config.get("loop_interval", 0)
+            if loop_interval > 0:
+                self.logger.info(f"{self.trace_id}--在下一轮循环前等待{loop_interval}秒")
+                time.sleep(loop_interval)
+
+        self.aliyun_log.logging(
+            code=0000,
+            message="爬虫执行完成",
+            data=self.platform,
+            trace_id=self.trace_id
+        )
+        self.logger.info(f"{self.trace_id}--平台{self.platform}的爬虫完成,已处理{self.download_cnt}个视频")
 
 
 if __name__ == '__main__':
-    cr = UniversalCrawler("benshanzhufu", "recommend",
-                          rule_dict={'videos_cnt': {'min': 500, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
-                          user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}])
+    cr = UniversalCrawler(
+        platform_config={
+            "platform": "benshanzhufu",
+            "mode": "recommend",
+            "method": "POST",
+            "url": "https://api.example.com/video/list",
+            "headers": {"Content-Type": "application/json"},
+            "request_body": {"page": 1, "size": 20},
+            "response_parse": {
+                "data_path": "$.data.items",
+                "fields": {
+                    "title": "$.title",
+                    "video_url": "$.videoUrl",
+                    "author": "$.author.name",
+                    "duration": "$.duration"
+                }
+            },
+            "download_limit": {"min": 200},
+            "loop_times": 3
+        },
+        rule_dict={
+            'videos_cnt': {'min': 500, 'max': 0},
+            'duration': {'min': 30, 'max': 1200}
+        },
+        user_list=[{"uid": 20631262, "link": "recommend_2060", "nick_name": "人老心不老"}],
+        trace_id=str(uuid.uuid4())
+    )
 
-    cr.run()
+    cr.run()

+ 51 - 17
main.py

@@ -1,3 +1,4 @@
+import importlib
 import threading
 import traceback
 import json
@@ -6,18 +7,30 @@ import uuid
 
 from application.common import AliyunLogger, get_consumer, ack_message
 from application.common.log import Local
-from application.spiders.universal_crawler import UniversalCrawler
+from crawler_worker.universal_crawler import UniversalCrawler
 from application.config import TopicGroup
-from application.service.user_service import get_user_list
-from application.service.rule_service import get_rule_dict
+from application.functions.mysql_service import MysqlService
+from utils.config_loader import ConfigLoader
+
 
 def generate_trace_id():
     return f"{uuid.uuid4().hex}{int(time.time() * 1000)}"
 
-def handle_message(topic: str):
-    consumer = get_consumer(topic)
-    logger = AliyunLogger(platform=topic, mode="unknown")
 
+def import_custom_class(class_path):
+    """
+    动态导入模块中的类,如 crawler_worker.universal_crawler.UniversalCrawler
+    """
+    module_path, class_name = class_path.rsplit(".", 1)
+    print(module_path, class_name)
+    module = importlib.import_module(module_path)
+    return getattr(module, class_name)
+
+
+def handle_message(topic: str, mode: str):
+    consumer = get_consumer(topic_name=topic, group_id=topic)
+    logger = AliyunLogger(platform=topic, mode=mode)
+    platform_config = ConfigLoader().get_platform_config(topic)
     while True:
         try:
             messages = consumer.consume_message(wait_seconds=10, batch_size=1)
@@ -31,17 +44,36 @@ def handle_message(topic: str):
                 try:
                     payload = json.loads(body)
                     platform = payload["platform"]
-                    mode = payload.get("mode", "recommend")
-                    logger = AliyunLogger(platform=platform, mode=mode)
-                    Local.logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
+                    mode = payload["mode"]
+                    task_id = payload["id"]
+                    mysql_service = MysqlService(task_id, mode, platform)
+                    logger.logging(
+                        1001,
+                        "开始一轮抓取",
+                        data=payload,
+                        trace_id=trace_id
+                    )
+                    Local.init_logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
 
                     # 加载 user_list 与 rule_dict
-                    user_list = get_user_list(platform, mode)
-                    rule_dict = get_rule_dict(platform, mode)
+                    user_list = mysql_service.get_user_list()
+                    rule_dict = mysql_service.get_rule_dict()
+                    custom_class = platform_config.get("custom_class")  # 自定义类
+                    try:
+                        if custom_class:
+                            CrawlerClass = import_custom_class(custom_class)
+                        else:
+                            CrawlerClass = UniversalCrawler
 
-                    # 同步执行 UniversalCrawler
-                    crawler = UniversalCrawler(platform, mode, rule_dict, user_list)
-                    crawler.run()
+                        crawler = CrawlerClass(
+                            platform_config=platform_config,  # 把整段配置传进去
+                            rule_dict=rule_dict,
+                            user_list=user_list,
+                            trace_id=trace_id
+                        )
+                        crawler.run()
+                    except Exception as e:
+                        print(f"[{topic}] 爬虫运行异常: {e}")
 
                     # 执行成功后 ack
                     ack_message(mode, platform, message, consumer, trace_id=trace_id)
@@ -57,20 +89,22 @@ def handle_message(topic: str):
                     # 不 ack,等待下次重试
         except Exception as err:
             logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}")
-        time.sleep(2)
+
 
 def main():
-    topic_list = TopicGroup().topics
+    topic_list = TopicGroup()
     print(f"监听 Topics:{topic_list}")
 
     threads = []
     for topic in topic_list:
-        t = threading.Thread(target=handle_message, args=(topic,))
+        mode = topic.split("_")[1]
+        t = threading.Thread(target=handle_message, args=(topic, mode,))
         t.start()
         threads.append(t)
 
     for t in threads:
         t.join()
 
+
 if __name__ == '__main__':
     main()