Browse Source

中青看点

zhangliang 6 days ago
parent
commit
6c86cffd98

+ 0 - 0
application/__init__.py


+ 2 - 2
application/common/messageQueue/mq.py

@@ -31,7 +31,7 @@ class MQ(object):
             message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
             msg.set_message_key(message_key)
             re_msg = self.producer.publish_message(msg)
-            Local.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
+            Local.logger(platform,strategy).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
                                                   (re_msg.message_id, re_msg.message_body_md5))
         except MQExceptionBase as e:
-            Local.logger(strategy, platform).error("Publish Message Fail. Exception:%s\n" % e)
+            Local.logger(platform,strategy).error("Publish Message Fail. Exception:%s\n" % e)

+ 2 - 2
application/config/topic_group_queue.py

@@ -44,8 +44,8 @@ class TopicGroup(object):
             ('xlzf', 'recommend', 'xuanlanzhufu'),
             ('zzhxzfy', 'recommend', 'zhaozhaohuanxizhufuyu'),
             ('zqkd', 'recommend', 'zhongqingkandian'),
-            ('zqkd', 'recommend', 'zhongqingkandian'),
-            ('zqkd', 'related_recommend', 'zhongqingkandian')
+            ('zqkd', 'related_recommend', 'zhongqingkandian'),
+            ('zqkd', 'author', 'zhongqingkandian')
         ]
 
     def produce(self):

+ 173 - 0
application/functions/zqkd_db_redis.py

@@ -0,0 +1,173 @@
+import os
+import sys
+import threading
+from datetime import datetime, timedelta
+
+import redis
+sys.path.append(os.getcwd())
+
+from application.common.mysql import MysqlHelper
+
+
+class DatabaseOperations:
+    def __init__(self, mode, platform):
+        self.mysql = MysqlHelper(mode=mode, platform=platform)
+
+    def check_user_id(self, uid):
+        """
+        检查指定用户ID是否存在于数据库的zqkd_uid表中。
+
+        :param uid:要检查的用户ID
+        :return:如果用户ID存在于表中返回True,否则返回False
+        """
+        query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
+        result = self.mysql.select(sql=query_sql)
+        return bool(result)
+
+    def update_user(self, uid, user_name, avatar_url):
+        """
+        更新数据库中指定用户的用户名和头像URL。
+
+        :param uid:要更新信息的用户ID
+        :param user_name:新的用户名
+        :param avatar_url:新的头像URL
+        :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
+        """
+        update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
+        return self.mysql.update(sql=update_sql)
+
+    def insert_user(self, uid, user_name, avatar_url):
+        """
+        向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
+
+        :param uid:新用户的ID
+        :param user_name:新用户的用户名
+        :param avatar_url:新用户的头像URL
+        :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
+        """
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+        insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
+        return self.mysql.update(sql=insert_sql)
+
+    def select_user(self,last_scanned_id=0):
+        # 构建查询(根据last_scanned_id过滤)
+        query = "SELECT id, uid FROM zqkd_uid"
+        if last_scanned_id > 0:
+            query += f" WHERE id > {last_scanned_id}"
+        query += " ORDER BY id ASC"
+
+        return self.mysql.select(query)
+
+
+
+
+
+
+
+
+
+class RedisOperations:
+    _pool: redis.ConnectionPool = None
+    _instance = None
+    _lock = threading.Lock()  # 用于线程安全的单例创建
+
+    @classmethod
+    def get_instance(cls):
+        """线程安全的单例获取方法"""
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = cls()
+        return cls._instance
+
+    def __init__(self):
+        # 私有构造函数,使用 get_instance() 获取实例
+        if RedisOperations._instance is not None:
+            raise Exception("请使用 get_instance() 获取实例")
+
+        self._pool = self._get_pool()
+        self.client = redis.Redis(connection_pool=self._pool,decode_responses=True)  # 复用同一个客户端
+
+
+    def _get_pool(self) -> redis.ConnectionPool:
+        if self._pool is None:
+            self._pool = redis.ConnectionPool(
+                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
+                port=6379,
+                db=0,
+                password="Wqsd@2019",
+                max_connections=50,  # 增加最大连接数
+                socket_timeout=10,
+                retry_on_timeout=True
+            )
+        return self._pool
+
+    def close(self):
+        """关闭连接池"""
+        if self._pool:
+            self._pool.disconnect(inuse_connections=True)
+
+    def get_recommend_video(self, task="task:zqkd_video_id"):
+        """从Redis的指定列表中弹出并返回最左边的视频ID"""
+        try:
+            value_bytes = self.client.rpop(task)
+            if value_bytes:  # 检查是否为空(列表可能已空)
+                value_str = value_bytes.decode('utf-8')
+                return value_str
+        except Exception as e:
+            print("e")
+            return None
+
+    # def add_user_data(self, task, key):
+    #     """将用户数据添加到Redis的指定列表"""
+    #     try:
+    #         self.logger.info(f"添加用户{key}到任务{task}")
+    #         self.client.rpush(task, key)
+    #         self.logger.info(f"用户数据写入Redis成功,数据: {key}")
+    #     except Exception as e:
+    #         self.logger.error(f"写入用户数据到Redis时出现异常: {e}")
+
+    def check_video_id_exists(self, videoID):
+        """检查指定的视频ID是否已经存在于Redis中"""
+        key = f"crawler:zqkd:{videoID}"
+        try:
+            return self.client.exists(key)
+        except Exception as e:
+            return False
+
+    def save_video_id(self, videoID):
+        """将视频ID存储到Redis中,并为其设置7天的过期时间"""
+        key = f"crawler:zqkd:{videoID}"
+        try:
+            expiration_time = int(timedelta(days=7).total_seconds())
+            self.client.setex(key, expiration_time, "1")
+        except Exception as e:
+            print(f"保存视频ID {videoID}到Redis并设置过期时间时出现异常: {e}")
+
+    def save_recommend_video(self, videoID):
+        """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
+        task = "task:zqkd_video_id"
+        try:
+            pipe = self.client.pipeline()  # 使用管道执行多个命令
+            pipe.rpush(task, videoID)
+            pipe.expire(task, int(timedelta(days=2).total_seconds()))
+            pipe.execute()
+
+            # 检查数据是否写入成功
+            list_length = self.client.llen(task)
+        except Exception as e:
+            print(f"保存推荐视频 ID {videoID} 到 Redis 列表并设置过期时间时出现异常: {e}")
+
+    def get_last_scanned_id(self):
+        return self.client.get("zqkd_last_scanned_id")
+
+    def set_last_scanned_id(self,last_scanned_id):
+        return self.client.set("zqkd_last_scanned_id",last_scanned_id)
+
+
+if __name__ == '__main__':
+    db = DatabaseOperations("12","123")
+    user = db.select_user()
+    print(user)
+

+ 355 - 0
spider/crawler_author/zhongqingkandian_author.py

@@ -0,0 +1,355 @@
+import os
+import sys
+import asyncio
+import json
+import random
+import uuid
+import time
+import traceback
+from datetime import datetime
+import aiohttp
+
+sys.path.append(os.getcwd())
+from application.common.feishu import FsData
+from application.common.feishu.feishu_utils import FeishuUtils
+from application.common.gpt import GPT4oMini
+from application.common.messageQueue import MQ
+from application.common.log import AliyunLogger
+from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.log import Local
+
+
+class ZhongQingKanDianAuthor:
+    API_BASE_URL = "http://8.217.192.46:8889"
+    COMMON_HEADERS = {
+        "Content-Type": "application/json"
+    }
+    # 最大重试次数
+    MAX_RETRIES = 3
+    # 最大等待时长
+    TIMEOUT = 30
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        """
+        初始化
+        :param platform: 平台名称 zhongqingkandian
+        :param mode: 运行模式  recommend
+        :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}]
+        :param user_list: 用户列表
+        :param env: 运行环境,默认为 "prod"
+        """
+        self.limit_flag = True
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
+        self.db_ops = DatabaseOperations(mode=mode, platform=platform)
+        self.redis_ops = RedisOperations()
+        data_rule = FsData()
+        self.title_rule = data_rule.get_title_rule()
+        self.LocalLog = Local.logger(self.platform, self.mode)
+        self.curses = 1
+        result = self.redis_ops.get_last_scanned_id()
+        self.last_scanned_id = 0 if result is None else result
+        self.user_list = self.db_ops.select_user(self.last_scanned_id )
+
+    async def send_request(self, path, data):
+        """
+        异步发送 POST 请求到指定路径,带有重试机制。
+        :param path: 请求的 API 路径
+        :param data: 请求的数据
+        :return: 响应的 JSON 数据,如果请求失败则返回 None
+        """
+        full_url = f"{self.API_BASE_URL}{path}"
+        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
+            for retry in range(self.MAX_RETRIES):
+                try:
+                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
+                        response.raise_for_status()
+                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
+                        return await response.json()
+                except (aiohttp.ClientError, json.JSONDecodeError) as e:
+                    tb_info = traceback.format_exc()
+                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                    self.aliyun_log.logging(
+                        code="3000",
+                        message=f"请求 {path} 失败,错误信息: {str(e)}",
+                        data={"path": path}
+                    )
+                    await asyncio.sleep(5)
+        return None
+
+    def is_response_valid(self, resp, url):
+        """
+        检查响应是否有效(状态码为 0 表示有效)。
+        :param resp: 响应数据
+        :param url: 请求的 URL
+        :return: 如果响应有效则返回响应数据,否则返回 None
+        """
+        try:
+            if resp and resp.get('code') != 0:
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"抓取{url}失败,请求失败,响应:{resp}"
+                )
+                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
+                return None
+            return resp
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="3000",
+                message=f"检查响应有效性时出错,错误信息: {str(e)}",
+                data={"url": url, "resp": resp}
+            )
+            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
+            return None
+
+    async def req_user_list(self, account_id):
+        """
+        异步请求与指定内容 ID 相关的推荐列表。
+        :param
+        :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
+        """
+        try:
+
+            url = '/crawler/zhong_qing_kan_dian/blogger'
+            body = json.dumps({
+                 "account_id": f"{account_id}",
+                 "content_type": "全部",
+                 "cursor": f"{self.curses}"
+            })
+            self.LocalLog.info(f"开始请求用户视频列表{body}")
+            resp = await self.send_request(url, body)
+            return self.is_response_valid(resp, url)
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1004",
+                message=f"请求相关推荐视频列表时发生异常,错误信息: {str(e)}",
+                data={"url": url}
+            )
+            self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e}   \n{tb_info}")
+            return None
+
+    async def req_detail(self, content_link, **kwargs):
+        """
+        异步请求视频详情。
+        :param content_link: 视频内容链接
+        :param kwargs: 额外的视频信息
+        :return: 无返回值,处理视频详情信息
+        """
+        try:
+            self.LocalLog.info(f"开始请求视频详情,链接: {content_link}")
+            url = '/crawler/zhong_qing_kan_dian/detail'
+            body = json.dumps({
+                "content_link": content_link
+            })
+            resp = await self.send_request(url, body)
+            if not self.is_response_valid(resp, url):
+                return
+            data = resp.get("data", {}).get("data", {})
+            if data.get("content_type") != "video":
+                self.aliyun_log.logging(
+                    code="3003",
+                    message=f"跳过非视频内容",
+                    data={"content_link": content_link}
+                )
+                self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
+                return
+            self.LocalLog.info(f"{content_link} 是视频")
+            data.update(kwargs)
+            await self.process_video_obj(data)
+            await asyncio.sleep(10)
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"请求视频详情时发生异常,错误信息: {str(e)}",
+                data={"content_link": content_link}
+            )
+            self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
+    async def control_request_author(self):
+        """
+        控制相关推荐视频列表的请求和处理流程。
+        :return: 无返回值,根据下载数量限制控制流程
+        """
+        while self.limit_flag:
+            try:
+                self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
+
+                if not self.user_list:
+                    self.LocalLog.info("没有用户数据")
+                    await asyncio.sleep(10)
+                    continue
+                for user_info in self.user_list:
+                    current_id, user_id = user_info
+                    author_resp = await self.req_user_list(user_id)
+                    if current_id > self.last_scanned_id:
+                        self.last_scanned_id = current_id
+
+                    self.LocalLog.info(f"获取的用户视频列表长度:{len(author_resp)}")
+                    if not author_resp:
+                        continue
+                    author_list = author_resp.get("data", {}).get("data", [])
+                    for author_obj in author_list:
+                        author_content_link = author_obj.get("share_url")
+                        if author_content_link:
+                            await self.req_detail(author_content_link, **author_obj)
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.aliyun_log.logging(
+                    code="3009",
+                    message=f"控制相关推荐视频请求和处理时发生异常,错误信息: {str(e)}",
+                    data={}
+                )
+                self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
+
+    async def process_video_obj(self, video_obj):
+        """
+        处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
+        :param video_obj: 视频对象,包含视频的各种信息
+        :return: 无返回值,完成视频对象的处理
+        """
+        try:
+            video_duration = video_obj["video_url_list"][0]['video_duration']
+            video_id = video_obj['channel_content_id']
+            # 检查视频ID是否存在
+            if self.redis_ops.check_video_id_exists(video_id):
+                self.aliyun_log.logging(
+                    code="3004",
+                    message=f"重复视频ID:{video_id}"
+                )
+                self.LocalLog.info(f"重复视频ID: {video_id}")
+                return
+            our_user = random.choice(self.user_list)
+            trace_id = self.platform + str(uuid.uuid1())
+            item = VideoItem()
+
+            account_id = video_obj["channel_account_id"]
+            account_name = video_obj["channel_account_name"]
+            account_avatar = video_obj["avatar"]
+            # 检查用户ID是否存在
+            """
+            需要改为判断redis
+            """
+            is_repeat_user = self.db_ops.check_user_id(account_id)
+            if is_repeat_user:
+                # 更新用户信息,使用异步方法并等待结果
+                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
+                self.db_ops.update_user(account_id, account_name, account_avatar)
+            else:
+                self.LocalLog.info(f"用户{account_id}没在数据库中")
+                # 插入用户信息,使用异步方法并等待结果
+                self.db_ops.insert_user(account_id, account_name, account_avatar)
+                self.redis_ops.add_user_data("task:zqkd_user_id", json.dumps({"uid": account_id}))
+                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+
+            if video_duration > self.rule_dict.get("duration", {}).get("max",
+                                                                       1200) or video_duration < self.rule_dict.get(
+                    "duration", {}).get("min", 30):
+                self.aliyun_log.logging(
+                    code="3005",
+                    message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
+                )
+                self.LocalLog.info(
+                    f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}")
+                return
+
+            item.add_video_info("video_id", video_obj['channel_content_id'])
+            item.add_video_info("video_title", video_obj["title"])
+            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+            item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
+            item.add_video_info("out_user_id", video_obj["channel_account_id"])
+            item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
+            item.add_video_info("like_cnt", 0)
+            item.add_video_info("collection_cnt", int(video_obj['collect_num']))
+            item.add_video_info("share_cnt", int(video_obj["share_num"]))
+            item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+            item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
+            item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
+            item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
+            item.add_video_info("platform", self.platform)
+            item.add_video_info("strategy", self.mode)
+            item.add_video_info("session", f"{self.platform}-{int(time.time())}")
+            item.add_video_info("user_id", our_user["uid"])
+            item.add_video_info("user_name", our_user["nick_name"])
+
+            mq_obj = item.produce_item()
+            pipeline = PiaoQuanPipeline(
+                platform=self.platform,
+                mode=self.mode,
+                rule_dict=self.rule_dict,
+                env=self.env,
+                item=mq_obj,
+                trace_id=trace_id
+            )
+            if pipeline.process_item():
+                title_list = self.title_rule.split(",")
+                title = video_obj["title"]
+                contains_keyword = any(keyword in title for keyword in title_list)
+                if contains_keyword:
+                    new_title = GPT4oMini.get_ai_mini_title(title)
+                    if new_title:
+                        item.add_video_info("video_title", new_title)
+                        current_time = datetime.now()
+                        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+                        values = [
+                            [
+                                video_obj["video_url_list"][0]['video_url'],
+                                video_obj["image_url_list"][0]['image_url'],
+                                title,
+                                new_title,
+                                formatted_time,
+                            ]
+                        ]
+                        FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            # 保存视频ID
+            self.redis_ops.save_video_id(video_obj['channel_content_id'])
+            if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300):
+                # 记录轮训到的用户id
+                self.redis_ops.set_last_scanned_id(self.last_scanned_id)
+                self.limit_flag = False
+            else:
+                self.redis_ops.set_last_scanned_id(0)
+                self.curses += 1
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"处理视频对象时发生异常,错误信息: {str(e)}",
+                data={"video_obj": video_obj}
+            )
+            self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
+
+    async def run(self):
+        """
+        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
+
+        :return: 无返回值,程序运行的主逻辑
+        """
+        self.LocalLog.info("开始执行中青看点推荐抓取...")
+        await asyncio.gather(
+            self.control_request_author()
+        )
+
+
+if __name__ == '__main__':
+    asyncio.run(ZhongQingKanDianAuthor(
+        platform="zhongqingkandian",
+        mode="author",
+        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
+        user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+    ).run())

+ 0 - 0
spider/crawler_offline/zhongqingkandian.py → spider/crawler_offline/zhongqingkandian_old.py


+ 279 - 271
spider/crawler_online/zhongqingkandian.py

@@ -1,54 +1,46 @@
-import asyncio
 import os
-import random
 import sys
-import time
-import uuid
+import asyncio
 import json
+import random
+import uuid
+import time
+import traceback
 from datetime import datetime
-
 import aiohttp
-import requests
 
+sys.path.append(os.getcwd())
 from application.common.feishu import FsData
 from application.common.feishu.feishu_utils import FeishuUtils
 from application.common.gpt import GPT4oMini
-from application.common.redis.redis_helper import SyncRedisHelper
-
-sys.path.append(os.getcwd())
-
-from application.items import VideoItem
-from application.pipeline import PiaoQuanPipeline
 from application.common.messageQueue import MQ
 from application.common.log import AliyunLogger
-from application.common.mysql import MysqlHelper
-
-
-
+from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.log import Local
 
 
 class ZhongQingKanDian:
-    # / recommend(列表11个id)
-    # ↓ 并发请求每个id的 / related(得到列表N个元素)
-    # ↓ 对每个元素并发请求 / detail
-    # ↓ 若为视频,写入Redis(键:detail_id,值:视频数据)
     API_BASE_URL = "http://8.217.192.46:8889"
     COMMON_HEADERS = {
         "Content-Type": "application/json"
     }
+    # 最大重试次数
     MAX_RETRIES = 3
-    TIMEOUT = 30  # 设置超时时间
-    max_recommend_count = 100  # 推荐抓取每日最大量
-    max_related_recommend_count = 200  # 相关推荐抓取每日最大量
-    max_author_video = 300  # 账号每日抓取视频最大量
-
-    """
-        中青看点推荐流
-        Topic:zqkd_recommend_prod
-        """
+    # 最大等待时长
+    TIMEOUT = 30
 
     def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
-        self.limit_flag = False
+        """
+        初始化
+        :param platform: 平台名称 zhongqingkandian
+        :param mode: 运行模式  recommend
+        :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}]
+        :param user_list: 用户列表
+        :param env: 运行环境,默认为 "prod"
+        """
+        self.limit_flag = True
         self.platform = platform
         self.mode = mode
         self.rule_dict = rule_dict
@@ -58,287 +50,303 @@ class ZhongQingKanDian:
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.expire_flag = False
         self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
-        self.mysql = MysqlHelper(mode=self.mode, platform=self)
+        self.db_ops = DatabaseOperations(mode=mode, platform=platform)
+        self.redis_ops = RedisOperations()
         data_rule = FsData()
         self.title_rule = data_rule.get_title_rule()
+        self.LocalLog = Local.logger(self.platform, self.mode)
 
     async def send_request(self, path, data):
+        """
+        异步发送 POST 请求到指定路径,带有重试机制。
+        :param path: 请求的 API 路径
+        :param data: 请求的数据
+        :return: 响应的 JSON 数据,如果请求失败则返回 None
+        """
         full_url = f"{self.API_BASE_URL}{path}"
         async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
             for retry in range(self.MAX_RETRIES):
                 try:
                     async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
                         response.raise_for_status()
+                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
                         return await response.json()
-                except aiohttp.ClientError as e:
-                    if retry < self.MAX_RETRIES - 1:
-                        await asyncio.sleep(2)
-                except json.JSONDecodeError as e:
-                    if retry < self.MAX_RETRIES - 1:
-                        await asyncio.sleep(2)
+                except (aiohttp.ClientError, json.JSONDecodeError) as e:
+                    tb_info = traceback.format_exc()
+                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                    self.aliyun_log.logging(
+                        code="3000",
+                        message=f"请求 {path} 失败,错误信息: {str(e)}",
+                        data={"path": path}
+                    )
+                    await asyncio.sleep(random.randint(5, 10))
         return None
 
-    def is_response_valid(self, resp):
-        if resp['code'] != 0:
+    def is_response_valid(self, resp, url):
+        """
+        检查响应是否有效(状态码为 0 表示有效)。
+        :param resp: 响应数据
+        :param url: 请求的 URL
+        :return: 如果响应有效则返回响应数据,否则返回 None
+        """
+        try:
+            if resp and resp.get('code') != 0:
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"抓取{url}失败,请求失败,响应:{resp}"
+                )
+                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
+                return None
+            return resp
+        except Exception as e:
+            tb_info = traceback.format_exc()
             self.aliyun_log.logging(
                 code="3000",
-                message="抓取单条视频失败,请求失败"
-            ),
-            return
-        return resp
+                message=f"检查响应有效性时出错,错误信息: {str(e)}",
+                data={"url": url, "resp": resp}
+            )
+            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
+            return None
 
     async def req_recommend_list(self):
-        print("开始请求推荐")
-        '''
-        推荐请求
-        '''
-        url = '/crawler/zhong_qing_kan_dian/recommend'
-        body = json.dumps({"cursor": ""})
-        resp = await self.send_request(url, body)
-        return self.is_response_valid(resp)
-
-    async def req_related_recommend_list(self, content_id):
-        print("请求相关推荐")
-        '''
-         相关推荐请求
-        '''
-        url = '/crawler/zhong_qing_kan_dian/related'
-        body = json.dumps({
-            "content_id": str(content_id),
-            "cursor": ""
-        })
-        resp = await self.send_request(url, body)
-        return self.is_response_valid(resp)
-
+        """
+        异步请求推荐视频列表。
+        :return: 推荐视频列表的有效响应数据,如果请求失败则返回 None
+        """
+        try:
 
-    async def req_detail(self, content_link, label,**kwargs):
-        print("请求详情")
-        '''
-        请求详情
-        '''
-        url = '/crawler/zhong_qing_kan_dian/detail'
-        body = json.dumps({
-            "content_link": content_link
-        })
-        resp = await self.send_request(url, body)
-        if not self.is_response_valid(resp):
-            return
-        data = resp.get("data", {}).get("data", {})
-        if data.get("content_type") != "video":
+            url = '/crawler/zhong_qing_kan_dian/recommend'
+            body = json.dumps({"cursor": ""})
+            self.LocalLog.info(f"开始请求推荐{body}")
+            resp = await self.send_request(url, body)
+            return self.is_response_valid(resp, url)
+        except Exception as e:
+            tb_info = traceback.format_exc()
             self.aliyun_log.logging(
-                code="3003",
-                message=f"跳过非视频内容(label={label})",
-                data={"content_link": content_link}
+                code="1003",
+                message=f"请求推荐视频列表时发生异常,错误信息: {str(e)}\n{tb_info}",
+                data={"url": url}
             )
-            return
-        print("是视频")
-        # 将 kwargs 中的键值对更新到 data 字典中
-        data.update(kwargs)
-        self.process_video_obj(data, label)
-        await asyncio.sleep(10)
-
-    async def control_request(self):
-        print("开始处理")
-        """核心控制逻辑:顺序处理三个接口"""
-        recommend_resp = await self.req_recommend_list()
-        if not self.is_response_valid(recommend_resp):
-            return
-
-        recommend_list = recommend_resp.get("data", {}).get("data", [])
+            self.LocalLog.info(f"请求推荐视频列表 {url} 时发生异常:{str(e)}   \n{tb_info}")
+            return None
 
-        for video_obj in recommend_list:
-            content_link = video_obj.get("share_url")
-            content_id = video_obj.get("id")
 
-            if not (content_link and content_id):
-                continue
-            # 处理推荐视频详情
-            await self.req_detail(content_link, "recommend",**video_obj)
 
-            # # 处理相关推荐列表(间隔后执行)
-            # await asyncio.sleep(5)
-            # related_resp = await self.req_related_recommend_list(content_id)
-            # if not self.is_response_valid(related_resp):
-            #     continue
-            #
-            # related_list = related_resp.get("data", {}).get("data", [])
-            # for related_obj in related_list:
-            #     related_content_link = related_obj.get("share_url")
-            #     if related_content_link:
-            #         await self.req_detail(related_content_link, "related",**related_obj)
-    def process_video_obj(self, video_obj, label):
+    async def req_detail(self, content_link, **kwargs):
         """
-        处理视频
-        :param video_obj:
+        异步请求视频详情。
+        :param content_link: 视频内容链接
+        :param label: 视频标签(如 "recommend" 或 "related")
+        :param kwargs: 额外的视频信息
+        :return: 无返回值,处理视频详情信息
         """
+        try:
+            self.LocalLog.info(f"开始请求视频详情,链接: {content_link}")
+            url = '/crawler/zhong_qing_kan_dian/detail'
+            body = json.dumps({
+                "content_link": content_link
+            })
+            resp = await self.send_request(url, body)
+            if not self.is_response_valid(resp, url):
+                return
+            data = resp.get("data", {}).get("data", {})
+            if data.get("content_type") != "video":
+                self.aliyun_log.logging(
+                    code="3003",
+                    message=f"跳过非视频内容)",
+                    data={"content_link": content_link}
+                )
+                self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
+                return
+            self.LocalLog.info(f"{content_link} 是视频")
+            data.update(kwargs)
+            await self.process_video_obj(data)
+            await asyncio.sleep(10)
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"请求视频详情时发生异常,错误信息: {str(e)}",
+                data={"content_link": content_link}
+            )
+            self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
 
-        if not self.save_video_id():
-
-        our_user = random.choice(self.user_list)
-        trace_id = self.platform + str(uuid.uuid1())
-        item = VideoItem()
+    async def control_request_recommend(self):
+        """
+        控制推荐视频列表的请求和处理流程。
+        :return: 无返回值,根据下载数量限制控制流程
+        """
+        while self.limit_flag:
+            try:
+                self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬推荐 {self.download_cnt} 个视频")
+                recommend_resp = await self.req_recommend_list()
+                if not recommend_resp:
+                    continue
+                recommend_list = recommend_resp.get("data", {}).get("data", [])
+                self.LocalLog.info(f"获取的推荐列表长度:{len(recommend_list)}")
+                for video_obj in recommend_list:
+                    content_link = video_obj.get("share_url")
+                    content_id = video_obj.get("id")
+                    self.LocalLog.info(f"content_link == {content_link} \n content_id == {content_id}")
+                    if not (content_link and content_id):
+                        continue
+                    # 当前内容id保存到redis
+                    self.redis_ops.save_recommend_video(content_id)
+                    await self.req_detail(content_link, **video_obj)
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.aliyun_log.logging(
+                    code="3008",
+                    message=f"控制推荐视频请求和处理时发生异常,错误信息: {str(e)}",
+                    data={}
+                )
+                self.LocalLog.info(f"控制推荐视频请求和处理时发生异常:\n{tb_info}")
+        self.LocalLog.info(f"循环结束,当前 limit_flag 值为: {self.limit_flag}")
+
+    async def process_video_obj(self, video_obj):
+        """
+        处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
+        :param video_obj: 视频对象,包含视频的各种信息
+        :return: 无返回值,完成视频对象的处理
+        """
         try:
+
+            video_duration = video_obj["video_url_list"][0]['video_duration']
             video_id = video_obj['channel_content_id']
+            # 检查视频ID是否存在
+            if self.redis_ops.check_video_id_exists(video_id):
+                self.aliyun_log.logging(
+                    code="3004",
+                    message=f"重复视频ID:{video_id}"
+                )
+                self.LocalLog.info(f"重复视频ID: {video_id}")
+                return
+            our_user = random.choice(self.user_list)
+            trace_id = self.platform + str(uuid.uuid1())
+            item = VideoItem()
+
             account_id = video_obj["channel_account_id"]
             account_name = video_obj["channel_account_name"]
             account_avatar = video_obj["avatar"]
-            is_repeat_user = self.select_id(account_id)
-            # 判断用户是否重复
+            # 检查用户ID是否存在
+            is_repeat_user = self.db_ops.check_user_id(account_id)
             if is_repeat_user:
-                self.update_name_url(account_id, account_name, account_avatar)
+                # 更新用户信息,使用异步方法并等待结果
+                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
+                self.db_ops.update_user(account_id, account_name, account_avatar)
             else:
-                # 写表
-                self.insert_name_url(account_id, account_name, account_avatar)
-                # 写redis
-                self.write_redis_user_data(json.dumps({"uid": account_id}))
-                print("写入成功")
-        except Exception as e:
-            print(f"写入异常{e}")
-            pass
-        url = video_obj["video_url_list"][0]['video_url']
-        duration = video_obj["video_url_list"][0]['video_duration']
-        item.add_video_info("video_id", video_obj['channel_content_id'])
-        item.add_video_info("video_title", video_obj["title"])
-        item.add_video_info("play_cnt", int(video_obj["read_num"]))
-        item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"])/1000))
-        item.add_video_info("out_user_id", video_obj["channel_account_id"])
-        item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
-        item.add_video_info("like_cnt", 0)
-        item.add_video_info("collection_cnt", int(video_obj['collect_num']))
-        item.add_video_info("share_cnt", int(video_obj["share_num"]))
-        item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
-        item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
-        item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
-        item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
-        item.add_video_info("platform", self.platform)
-        item.add_video_info("strategy", self.mode)
-        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
-        item.add_video_info("user_id", our_user["uid"])
-        item.add_video_info("user_name", our_user["nick_name"])
-
-        mq_obj = item.produce_item()
-        pipeline = PiaoQuanPipeline(
-            platform=self.platform,
-            mode=self.mode,
-            rule_dict=self.rule_dict,
-            env=self.env,
-            item=mq_obj,
-            trace_id=trace_id,
-        )
-        if pipeline.process_item():
-            title_list = self.title_rule.split(",")
-            title = video_obj["title"]
-            contains_keyword = any(keyword in title for keyword in title_list)
-            if contains_keyword:
-                new_title = GPT4oMini.get_ai_mini_title(title)
-                if new_title:
-                    item.add_video_info("video_title", new_title)
-                    current_time = datetime.now()
-                    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-                    values = [
-                        [
-                            video_obj["video_url_list"][0]['video_url'],
-                            video_obj["image_url_list"][0]['image_url'],
-                            title,
-                            new_title,
-                            formatted_time,
+                self.LocalLog.info(f"用户{account_id}没在数据库中")
+                # 插入用户信息,使用异步方法并等待结果
+                self.db_ops.insert_user(account_id, account_name, account_avatar)
+                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+            if video_duration > self.rule_dict.get("duration", {}).get("max",
+                                                                       1200) or video_duration < self.rule_dict.get(
+                    "duration", {}).get("min", 30):
+                self.aliyun_log.logging(
+                    code="3005",
+                    message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
+                )
+                self.LocalLog.info(
+                    f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}")
+                return
+
+            item.add_video_info("video_id", video_obj['channel_content_id'])
+            item.add_video_info("video_title", video_obj["title"])
+            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+            item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
+            item.add_video_info("out_user_id", video_obj["channel_account_id"])
+            item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
+            item.add_video_info("like_cnt", 0)
+            item.add_video_info("collection_cnt", int(video_obj['collect_num']))
+            item.add_video_info("share_cnt", int(video_obj["share_num"]))
+            item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+            item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
+            item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
+            item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
+            item.add_video_info("platform", self.platform)
+            item.add_video_info("strategy", self.mode)
+            item.add_video_info("session", f"{self.platform}-{int(time.time())}")
+            item.add_video_info("user_id", our_user["uid"])
+            item.add_video_info("user_name", our_user["nick_name"])
+
+            mq_obj = item.produce_item()
+            pipeline = PiaoQuanPipeline(
+                platform=self.platform,
+                mode=self.mode,
+                rule_dict=self.rule_dict,
+                env=self.env,
+                item=mq_obj,
+                trace_id=trace_id
+            )
+            if pipeline.process_item():
+                title_list = self.title_rule.split(",")
+                title = video_obj["title"]
+                contains_keyword = any(keyword in title for keyword in title_list)
+                if contains_keyword:
+                    new_title = GPT4oMini.get_ai_mini_title(title)
+                    if new_title:
+                        item.add_video_info("video_title", new_title)
+                        current_time = datetime.now()
+                        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+                        values = [
+                            [
+                                video_obj["video_url_list"][0]['video_url'],
+                                video_obj["image_url_list"][0]['image_url'],
+                                title,
+                                new_title,
+                                formatted_time,
+                            ]
                         ]
-                    ]
-                    FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
-                    time.sleep(0.5)
-                    FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
-            self.download_cnt += 1
-            self.mq.send_msg(mq_obj)
-            self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
-            if self.download_cnt >= int(
-                    self.rule_dict.get("videos_cnt", {}).get("min", 200)
-            ):
-                self.limit_flag = True
-            if label == "recommend":
-                key = f"crawler:zqkd:{video_id}"
-                self.save_video_id(key)
-
-
-
-    """
-      查询用户id是否存在
-      """
-
-    def select_id(self, uid):
-        sql = f""" select uid from zqkd_uid where uid = "{uid}"; """
-        db = MysqlHelper()
-        repeat_user = db.select(sql=sql)
-        if repeat_user:
-            return True
-        return False
-    def update_name_url(self, uid,user_name,avatar_url):
-        sql = f""" update zqkd_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{uid}"; """
-        db = MysqlHelper()
-        repeat_video = db.update(sql=sql)
-        if repeat_video:
-            return True
-        return False
-
-    def insert_name_url(self, uid, user_name, avatar_url):
-        current_time = datetime.now()
-        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-        insert_sql = f"""INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')"""
-        db = MysqlHelper()
-        repeat_video = db.update(sql=insert_sql)
-        if repeat_video:
-            return True
-        return False
-
-    def get_redis_video_data(self):
-        """获取一条id"""
-        task = f"task:zqkd_video_id"
-        helper = SyncRedisHelper()
-        client = helper.get_client()
-
-        # 获取列表的长度
-        list_length = client.llen(task)
-        # 循环获取列表中的元素
-        for i in range(list_length):
-            # 使用 lrange 获取单个元素
-            element = client.lrange(task, i, i)
-            if element:
-                print(f"Element at index {i}: {element[0].decode('utf-8')}")
-                return element
-
-    def write_redis_user_data(self,key,ret):
-        """写入"""
-        task = f"task:zqkd_user_id"
-        helper = SyncRedisHelper()
-        client = helper.get_client()
-        client.rpush(task, ret)
+                        FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
+
+                self.mq.send_msg(mq_obj)
+                self.download_cnt += 1
+                self.aliyun_log.logging(
+                    code="2009",
+                    message=f"成功发送视频到etl",
+                    data={"video_obj": video_obj}
+                )
+                # 保存视频ID
+                self.redis_ops.save_video_id(video_obj['channel_content_id'])
+                if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                    self.limit_flag = False
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"处理视频对象时发生异常,错误信息: {str(e)}",
+                data={"video_obj": video_obj}
+            )
+            self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
 
     async def run(self):
-        while True:
-            await self.control_request()
-    def save_video_id(self,key):
-        helper = SyncRedisHelper()
-        client = helper.get_client()
-        # 将视频ID存储到Redis中,并设置过期时间为7天
-        # 检查键是否存在
-
-        if client.exists(key):
-            return False
-        else:
-            expiration_time = int(timedelta(days=7).total_seconds())
-            client.setex(key, expiration_time, "1")
+        """
+        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
 
+        :return: 无返回值,程序运行的主逻辑
+        """
+        self.LocalLog.info("开始执行中青看点推荐抓取...")
+        await asyncio.gather(
+            self.control_request_recommend()
+        )
 
-from datetime import datetime, timedelta
 
 if __name__ == '__main__':
+    asyncio.run(ZhongQingKanDian(
+        platform="zhongqingkandian",
+        mode="recommend",
+        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
+        user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+    ).run())
+    # content_link = "https://vol.youth.cn/4X32ftEV6SsA9Mq9?signature=6y30XlmbkL9oxwAjJd1PXOBX0idx0ZD1gMQE2nZKW8RNpvPrqz"
     # asyncio.run(ZhongQingKanDian(
     #     platform="zhongqingkandian",
     #     mode="recommend",
-    #     rule_dict={},
-    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"},
-    #                ]
-    #
-    # ).run())
-    save_video_id("1234")
-
-
+    #     rule_dict={
+    #         {"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}
+    #     },
+    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+    # ).req_detail(content_link,"测试"))

+ 366 - 0
spider/crawler_online/zhongqingkandian_related_recommend.py

@@ -0,0 +1,366 @@
+import os
+import sys
+import asyncio
+import json
+import random
+import uuid
+import time
+import traceback
+from datetime import datetime
+import aiohttp
+
+sys.path.append(os.getcwd())
+from application.common.feishu import FsData
+from application.common.feishu.feishu_utils import FeishuUtils
+from application.common.gpt import GPT4oMini
+from application.common.messageQueue import MQ
+from application.common.log import AliyunLogger
+from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.log import Local
+
+
+class ZhongQingKanDianRelated:
+    API_BASE_URL = "http://8.217.192.46:8889"
+    COMMON_HEADERS = {
+        "Content-Type": "application/json"
+    }
+    # 最大重试次数
+    MAX_RETRIES = 3
+    # 最大等待时长
+    TIMEOUT = 30
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        """
+        初始化
+        :param platform: 平台名称 zhongqingkandian
+        :param mode: 运行模式  recommend
+        :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}]
+        :param user_list: 用户列表
+        :param env: 运行环境,默认为 "prod"
+        """
+        self.limit_flag = True
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
+        self.db_ops = DatabaseOperations(mode=mode, platform=platform)
+        self.redis_ops = RedisOperations()
+        data_rule = FsData()
+        self.title_rule = data_rule.get_title_rule()
+        self.LocalLog = Local.logger(self.platform, self.mode)
+
+
+    async def send_request(self, path, data):
+        """
+        异步发送 POST 请求到指定路径,带有重试机制。
+        :param path: 请求的 API 路径
+        :param data: 请求的数据
+        :return: 响应的 JSON 数据,如果请求失败则返回 None
+        """
+        full_url = f"{self.API_BASE_URL}{path}"
+        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
+            for retry in range(self.MAX_RETRIES):
+                try:
+                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
+                        response.raise_for_status()
+                        self.LocalLog.info(f"{path}响应数据:{await response.json()}")
+                        return await response.json()
+                except (aiohttp.ClientError, json.JSONDecodeError) as e:
+                    tb_info = traceback.format_exc()
+                    self.LocalLog.info(f"{path}请求失败:{e}  \n{tb_info}")
+                    self.aliyun_log.logging(
+                        code="3000",
+                        message=f"请求 {path} 失败,错误信息: {str(e)}",
+                        data={"path": path}
+                    )
+                    await asyncio.sleep(5)
+        return None
+
+    def is_response_valid(self, resp, url):
+        """
+        检查响应是否有效(状态码为 0 表示有效)。
+        :param resp: 响应数据
+        :param url: 请求的 URL
+        :return: 如果响应有效则返回响应数据,否则返回 None
+        """
+        try:
+            if resp and resp.get('code') != 0:
+                self.aliyun_log.logging(
+                    code="3000",
+                    message=f"抓取{url}失败,请求失败,响应:{resp}"
+                )
+                self.LocalLog.info(f"{url}请求失败,响应:{resp}")
+                return None
+            return resp
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="3000",
+                message=f"检查响应有效性时出错,错误信息: {str(e)}",
+                data={"url": url, "resp": resp}
+            )
+            self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
+            return None
+
+    async def req_related_recommend_list(self, content_id):
+        """
+        异步请求与指定内容 ID 相关的推荐列表。
+        :param
+        :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
+        """
+        try:
+
+            url = '/crawler/zhong_qing_kan_dian/related'
+            body = json.dumps({
+                "content_id": f"{content_id}",
+                "cursor": ""
+            })
+            self.LocalLog.info(f"开始请求相关推荐{body}")
+            resp = await self.send_request(url, body)
+            return self.is_response_valid(resp, url)
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1004",
+                message=f"请求相关推荐视频列表时发生异常,错误信息: {str(e)}",
+                data={"url": url}
+            )
+            self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e}   \n{tb_info}")
+            return None
+
+    async def req_detail(self, content_link, **kwargs):
+        """
+        异步请求视频详情。
+        :param content_link: 视频内容链接
+        :param kwargs: 额外的视频信息
+        :return: 无返回值,处理视频详情信息
+        """
+        try:
+            self.LocalLog.info(f"开始请求视频详情,链接: {content_link}")
+            url = '/crawler/zhong_qing_kan_dian/detail'
+            body = json.dumps({
+                "content_link": content_link
+            })
+            resp = await self.send_request(url, body)
+            if not self.is_response_valid(resp, url):
+                return
+            data = resp.get("data", {}).get("data", {})
+            if data.get("content_type") != "video":
+                self.aliyun_log.logging(
+                    code="3003",
+                    message=f"跳过非视频内容",
+                    data={"content_link": content_link}
+                )
+                self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
+                return
+            self.LocalLog.info(f"{content_link} 是视频")
+            data.update(kwargs)
+            await self.process_video_obj(data)
+            await asyncio.sleep(10)
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"请求视频详情时发生异常,错误信息: {str(e)}",
+                data={"content_link": content_link}
+            )
+            self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e}  \n{tb_info}")
+
+    async def control_request_related(self):
+        """
+        控制相关推荐视频列表的请求和处理流程。
+        :return: 无返回值,根据下载数量限制控制流程
+        """
+        while self.limit_flag:
+            try:
+                self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
+                content_id = self.redis_ops.get_recommend_video()
+                if not content_id:
+                    self.LocalLog.info("缓存中【task:zqkd_video_id】没有数据")
+                    await asyncio.sleep(10)
+                    continue
+                related_resp = await self.req_related_recommend_list(content_id)
+                self.LocalLog.info(f"获取的推荐列表长度:{len(related_resp)}")
+                if not related_resp:
+                    continue
+                related_list = related_resp.get("data", {}).get("data", [])
+                for related_obj in related_list:
+                    related_content_link = related_obj.get("share_url")
+                    if related_content_link:
+                        await self.req_detail(related_content_link, **related_obj)
+            except Exception as e:
+                tb_info = traceback.format_exc()
+                self.aliyun_log.logging(
+                    code="3009",
+                    message=f"控制相关推荐视频请求和处理时发生异常,错误信息: {str(e)}",
+                    data={}
+                )
+                self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
+
+    async def process_video_obj(self, video_obj):
+        """
+        处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
+        :param video_obj: 视频对象,包含视频的各种信息
+        :return: 无返回值,完成视频对象的处理
+        """
+        try:
+            video_duration = video_obj["video_url_list"][0]['video_duration']
+            video_id = video_obj['channel_content_id']
+            # 检查视频ID是否存在
+            if self.redis_ops.check_video_id_exists(video_id):
+                self.aliyun_log.logging(
+                    code="3004",
+                    message=f"重复视频ID:{video_id}"
+                )
+                self.LocalLog.info(f"重复视频ID: {video_id}")
+                return
+            our_user = random.choice(self.user_list)
+            trace_id = self.platform + str(uuid.uuid1())
+            item = VideoItem()
+            account_id = video_obj["channel_account_id"]
+            account_name = video_obj["channel_account_name"]
+            account_avatar = video_obj["avatar"]
+            # 检查用户ID是否存在
+            """
+            需要改为判断redis
+            """
+            is_repeat_user = self.db_ops.check_user_id(account_id)
+            if is_repeat_user:
+                # 更新用户信息,使用异步方法并等待结果
+                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
+                self.db_ops.update_user(account_id, account_name, account_avatar)
+            else:
+                self.LocalLog.info(f"用户{account_id}没在数据库中")
+                # 插入用户信息,使用异步方法并等待结果
+                self.db_ops.insert_user(account_id, account_name, account_avatar)
+                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+
+            if video_duration > self.rule_dict.get("duration", {}).get("max",
+                                                                       1200) or video_duration < self.rule_dict.get(
+                "duration", {}).get("min", 30):
+                self.aliyun_log.logging(
+                    code="3005",
+                    message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
+                )
+                self.LocalLog.info(
+                    f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}")
+                return
+
+            item.add_video_info("video_id", video_obj['channel_content_id'])
+            item.add_video_info("video_title", video_obj["title"])
+            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+            item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
+            item.add_video_info("out_user_id", video_obj["channel_account_id"])
+            item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
+            item.add_video_info("like_cnt", 0)
+            item.add_video_info("collection_cnt", int(video_obj['collect_num']))
+            item.add_video_info("share_cnt", int(video_obj["share_num"]))
+            item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+            item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
+            item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
+            item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
+            item.add_video_info("platform", self.platform)
+            item.add_video_info("strategy", self.mode)
+            item.add_video_info("session", f"{self.platform}-{int(time.time())}")
+            item.add_video_info("user_id", our_user["uid"])
+            item.add_video_info("user_name", our_user["nick_name"])
+
+            mq_obj = item.produce_item()
+            pipeline = PiaoQuanPipeline(
+                platform=self.platform,
+                mode=self.mode,
+                rule_dict=self.rule_dict,
+                env=self.env,
+                item=mq_obj,
+                trace_id=trace_id
+            )
+            if pipeline.process_item():
+                title_list = self.title_rule.split(",")
+                title = video_obj["title"]
+                contains_keyword = any(keyword in title for keyword in title_list)
+                if contains_keyword:
+                    new_title = GPT4oMini.get_ai_mini_title(title)
+                    if new_title:
+                        item.add_video_info("video_title", new_title)
+                        current_time = datetime.now()
+                        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+                        values = [
+                            [
+                                video_obj["video_url_list"][0]['video_url'],
+                                video_obj["image_url_list"][0]['image_url'],
+                                title,
+                                new_title,
+                                formatted_time,
+                            ]
+                        ]
+                        FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            self.aliyun_log.logging(
+                code="2009",
+                message=f"成功发送视频到etl",
+                data={"video_obj": video_obj}
+            )
+            self.LocalLog.info(f"成功发送etl")
+            # 保存视频ID
+            self.redis_ops.save_video_id(video_obj['channel_content_id'])
+            if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
+                self.limit_flag = False
+        except Exception as e:
+            tb_info = traceback.format_exc()
+            self.aliyun_log.logging(
+                code="1005",
+                message=f"处理视频对象时发生异常,错误信息: {str(e)}",
+                data={"video_obj": video_obj}
+            )
+            self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
+
+    async def run(self):
+        """
+        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
+
+        :return: 无返回值,程序运行的主逻辑
+        """
+        self.LocalLog.info("开始执行中青看点推荐抓取...")
+        await asyncio.gather(
+            self.control_request_related()
+        )
+
+    async def run(self):
+        """
+        运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
+
+        :return: 无返回值,程序运行的主逻辑
+        """
+        self.LocalLog.info("开始执行中青看点推荐抓取...")
+        await asyncio.gather(
+            self.control_request_related()
+        )
+
+
+if __name__ == '__main__':
+    asyncio.run(ZhongQingKanDianRelated(
+        platform="zhongqingkandian",
+        mode="recommend",
+        rule_dict={"videos_cnt": {"min": 2, "max": 0}},
+        user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+    ).run())
+    # content_link = "https://vol.youth.cn/4X32ftEV6SsA9Mq9?signature=6y30XlmbkL9oxwAjJd1PXOBX0idx0ZD1gMQE2nZKW8RNpvPrqz"
+    # asyncio.run(ZhongQingKanDian(
+    #     platform="zhongqingkandian",
+    #     mode="recommend",
+    #     rule_dict={
+    #         {"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}
+    #     },
+    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+    # ).req_detail(content_link,"测试"))

+ 15 - 1
spider/spider_map.py

@@ -30,7 +30,9 @@ from spider.crawler_online.zhufuquanzituijianliu import ZFQZTJLRecommend
 from spider.crawler_online.chaojipiaoquan import CJPQRecommend
 from spider.crawler_online.xuanlanzhufu import XLZFRecommend
 from spider.crawler_online.zhaozhaohuanxizhufuyu import ZZHXZFYRecommend
-
+from spider.crawler_online.zhongqingkandian import ZhongQingKanDian
+from spider.crawler_online.zhongqingkandian_related_recommend import ZhongQingKanDianRelated
+from spider.crawler_author.zhongqingkandian_author import ZhongQingKanDianAuthor
 spider_map = {
     # 祝万物复苏
     "zhuwanwufusunew": {
@@ -188,4 +190,16 @@ spider_map = {
     "zhaozhaohuanxizhufuyu": {
         "recommend": ZZHXZFYRecommend
     },
+    # 中青看点推荐
+    "zhongqingkandian": {
+        "recommend": ZhongQingKanDian
+    },
+    # 中青看点相关推荐
+    "zhongqingkandian":{
+        "related_recommend": ZhongQingKanDianRelated
+    },
+    # 中青看点用户
+    "zhongqingkandian":{
+        "author": ZhongQingKanDianAuthor
+    }
 }