Browse Source

中青看点

zhangliang 6 days ago
parent
commit
b1b7d0c56a
2 changed files with 139 additions and 94 deletions
  1. 107 63
      application/functions/zqkd_db_redis.py
  2. 32 31
      spider/crawler_author/zhongqingkandian_author.py

+ 107 - 63
application/functions/zqkd_db_redis.py

@@ -1,9 +1,13 @@
 import os
 import sys
 import threading
+import traceback
 from datetime import datetime, timedelta
 
 import redis
+
+from application.common import Local
+
 sys.path.append(os.getcwd())
 
 from application.common.mysql import MysqlHelper
@@ -12,6 +16,7 @@ from application.common.mysql import MysqlHelper
 class DatabaseOperations:
     def __init__(self, mode, platform):
         self.mysql = MysqlHelper(mode=mode, platform=platform)
+        self.LocalLog = Local.logger(platform, mode)
 
     def check_user_id(self, uid):
         """
@@ -20,9 +25,14 @@ class DatabaseOperations:
         :param uid:要检查的用户ID
         :return:如果用户ID存在于表中返回True,否则返回False
         """
-        query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
-        result = self.mysql.select(sql=query_sql)
-        return bool(result)
+        try:
+            query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
+            result = self.mysql.select(sql=query_sql)
+            return bool(result)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
+            return False
 
     def update_user(self, uid, user_name, avatar_url):
         """
@@ -33,8 +43,13 @@ class DatabaseOperations:
         :param avatar_url:新的头像URL
         :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
         """
-        update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
-        return self.mysql.update(sql=update_sql)
+        try:
+            update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
+            return self.mysql.update(sql=update_sql)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}")
+            return None
 
     def insert_user(self, uid, user_name, avatar_url):
         """
@@ -45,26 +60,34 @@ class DatabaseOperations:
         :param avatar_url:新用户的头像URL
         :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
         """
-        current_time = datetime.now()
-        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-        insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
-        return self.mysql.update(sql=insert_sql)
-
-    def select_user(self,last_scanned_id=0):
-        # 构建查询(根据last_scanned_id过滤)
-        query = "SELECT id, uid FROM zqkd_uid"
-        if last_scanned_id > 0:
-            query += f" WHERE id > {last_scanned_id}"
-        query += " ORDER BY id ASC"
-
-        return self.mysql.select(query)
-
-
-
-
-
+        try:
+            current_time = datetime.now()
+            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+            insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
+            return self.mysql.update(sql=insert_sql)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")
+            return None
 
+    def select_user(self, last_scanned_id=0):
+        """
+        根据last_scanned_id分页查询用户数据
+        :param last_scanned_id: 上次扫描的ID,0表示从头开始
+        :return: 查询结果列表
+        """
+        try:
+            # 构建查询(根据last_scanned_id过滤)
+            query = "SELECT id, uid FROM zqkd_uid"
+            if last_scanned_id > 0:
+                query += f" WHERE id > {last_scanned_id}"
+            query += " ORDER BY id ASC"
 
+            return self.mysql.select(query)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}")
+            return []
 
 
 class RedisOperations:
@@ -73,82 +96,87 @@ class RedisOperations:
     _lock = threading.Lock()  # 用于线程安全的单例创建
 
     @classmethod
-    def get_instance(cls):
+    def get_instance(cls, mode="", platform=""):
         """线程安全的单例获取方法"""
         if not cls._instance:
             with cls._lock:
                 if not cls._instance:
-                    cls._instance = cls()
+                    cls._instance = cls(mode, platform)
         return cls._instance
 
-    def __init__(self):
+    def __init__(self, mode, platform):
         # 私有构造函数,使用 get_instance() 获取实例
+        self.mode = mode
+        self.platform = platform
+        self.LocalLog = Local.logger(self.platform, self.mode)
         if RedisOperations._instance is not None:
             raise Exception("请使用 get_instance() 获取实例")
 
         self._pool = self._get_pool()
-        self.client = redis.Redis(connection_pool=self._pool,decode_responses=True)  # 复用同一个客户端
-
+        self.client = redis.Redis(connection_pool=self._pool, decode_responses=True)  # 复用同一个客户端
 
     def _get_pool(self) -> redis.ConnectionPool:
         if self._pool is None:
-            self._pool = redis.ConnectionPool(
-                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
-                port=6379,
-                db=0,
-                password="Wqsd@2019",
-                max_connections=50,  # 增加最大连接数
-                socket_timeout=10,
-                retry_on_timeout=True
-            )
+            try:
+                self._pool = redis.ConnectionPool(
+                    host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
+                    port=6379,
+                    db=0,
+                    password="Wqsd@2019",
+                    max_connections=50,  # 增加最大连接数
+                    socket_timeout=10,
+                    retry_on_timeout=True
+                )
+            except Exception as e:
+                tb = traceback.format_exc()
+                self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}")
+                raise
         return self._pool
 
     def close(self):
         """关闭连接池"""
-        if self._pool:
-            self._pool.disconnect(inuse_connections=True)
+        try:
+            if self._pool:
+                self._pool.disconnect(inuse_connections=True)
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}")
 
     def get_recommend_video(self, task="task:zqkd_video_id"):
         """从Redis的指定列表中弹出并返回最左边的视频ID"""
         try:
             value_bytes = self.client.rpop(task)
-            if value_bytes:  # 检查是否为空(列表可能已空)
-                value_str = value_bytes.decode('utf-8')
-                return value_str
+            value_str = value_bytes.decode('utf-8')
+            return value_str
         except Exception as e:
-            print("e")
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}")
             return None
 
-    # def add_user_data(self, task, key):
-    #     """将用户数据添加到Redis的指定列表"""
-    #     try:
-    #         self.logger.info(f"添加用户{key}到任务{task}")
-    #         self.client.rpush(task, key)
-    #         self.logger.info(f"用户数据写入Redis成功,数据: {key}")
-    #     except Exception as e:
-    #         self.logger.error(f"写入用户数据到Redis时出现异常: {e}")
-
     def check_video_id_exists(self, videoID):
         """检查指定的视频ID是否已经存在于Redis中"""
-        key = f"crawler:zqkd:{videoID}"
         try:
+            key = f"crawler:zqkd:{videoID}"
             return self.client.exists(key)
         except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}")
             return False
 
     def save_video_id(self, videoID):
         """将视频ID存储到Redis中,并为其设置7天的过期时间"""
-        key = f"crawler:zqkd:{videoID}"
         try:
+            key = f"crawler:zqkd:{videoID}"
             expiration_time = int(timedelta(days=7).total_seconds())
             self.client.setex(key, expiration_time, "1")
         except Exception as e:
-            print(f"保存视频ID {videoID}到Redis并设置过期时间时出现异常: {e}")
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}")
 
     def save_recommend_video(self, videoID):
         """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
-        task = "task:zqkd_video_id"
         try:
+            task = "task:zqkd_video_id"
             pipe = self.client.pipeline()  # 使用管道执行多个命令
             pipe.rpush(task, videoID)
             pipe.expire(task, int(timedelta(days=2).total_seconds()))
@@ -156,18 +184,34 @@ class RedisOperations:
 
             # 检查数据是否写入成功
             list_length = self.client.llen(task)
+            self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}")
         except Exception as e:
-            print(f"保存推荐视频 ID {videoID} 到 Redis 列表并设置过期时间时出现异常: {e}")
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}")
 
     def get_last_scanned_id(self):
-        return self.client.get("zqkd_last_scanned_id")
+        """获取上次扫描的ID"""
+        try:
+            return self.client.get("zqkd_last_scanned_id").decode('utf-8')
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}")
+            return None
 
-    def set_last_scanned_id(self,last_scanned_id):
-        return self.client.set("zqkd_last_scanned_id",last_scanned_id)
+    def set_last_scanned_id(self, last_scanned_id):
+        """设置上次扫描的ID"""
+        try:
+            result = self.client.set("zqkd_last_scanned_id", last_scanned_id)
+            if result:
+                self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")
+            return result
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")
+            return False
 
 
 if __name__ == '__main__':
-    db = DatabaseOperations("12","123")
+    db = DatabaseOperations("12", "123")
     user = db.select_user()
-    print(user)
-
+    print(user)

+ 32 - 31
spider/crawler_author/zhongqingkandian_author.py

@@ -51,14 +51,15 @@ class ZhongQingKanDianAuthor:
         self.expire_flag = False
         self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
         self.db_ops = DatabaseOperations(mode=mode, platform=platform)
-        self.redis_ops = RedisOperations()
+        self.redis_ops = RedisOperations(mode=mode, platform=platform)
         data_rule = FsData()
         self.title_rule = data_rule.get_title_rule()
         self.LocalLog = Local.logger(self.platform, self.mode)
         self.curses = 1
         result = self.redis_ops.get_last_scanned_id()
-        self.last_scanned_id = 0 if result is None else result
-        self.user_list = self.db_ops.select_user(self.last_scanned_id )
+        self.last_scanned_id = 0 if result is None else int(result)
+        self.zqkd_user_list = self.db_ops.select_user(self.last_scanned_id)
+        self.LocalLog.info(f"获取到的用户列表:{self.zqkd_user_list} \n 昨天最后扫描的用户ID{self.last_scanned_id}")
 
     async def send_request(self, path, data):
         """
@@ -185,21 +186,21 @@ class ZhongQingKanDianAuthor:
             try:
                 self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
 
-                if not self.user_list:
+                if not self.zqkd_user_list:
                     self.LocalLog.info("没有用户数据")
                     await asyncio.sleep(10)
                     continue
-                for user_info in self.user_list:
+                for user_info in self.zqkd_user_list:
                     current_id, user_id = user_info
                     author_resp = await self.req_user_list(user_id)
                     if current_id > self.last_scanned_id:
                         self.last_scanned_id = current_id
-
-                    self.LocalLog.info(f"获取的用户视频列表长度:{len(author_resp)}")
                     if not author_resp:
                         continue
-                    author_list = author_resp.get("data", {}).get("data", [])
-                    for author_obj in author_list:
+                    author_data = author_resp.get("data", {}).get("data", [])
+                    if not author_data["next_cursor"]:
+                        continue
+                    for author_obj in author_data:
                         author_content_link = author_obj.get("share_url")
                         if author_content_link:
                             await self.req_detail(author_content_link, **author_obj)
@@ -236,22 +237,22 @@ class ZhongQingKanDianAuthor:
             account_id = video_obj["channel_account_id"]
             account_name = video_obj["channel_account_name"]
             account_avatar = video_obj["avatar"]
-            # 检查用户ID是否存在
-            """
-            需要改为判断redis
-            """
-            is_repeat_user = self.db_ops.check_user_id(account_id)
-            if is_repeat_user:
-                # 更新用户信息,使用异步方法并等待结果
-                self.LocalLog.info(f"用户{account_id}已经存在数据库中")
-                self.db_ops.update_user(account_id, account_name, account_avatar)
-            else:
-                self.LocalLog.info(f"用户{account_id}没在数据库中")
-                # 插入用户信息,使用异步方法并等待结果
-                self.db_ops.insert_user(account_id, account_name, account_avatar)
-                self.redis_ops.add_user_data("task:zqkd_user_id", json.dumps({"uid": account_id}))
-                self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
-                self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
+            # # 检查用户ID是否存在
+            # """
+            # 需要改为判断redis
+            # """
+            # is_repeat_user = self.db_ops.check_user_id(account_id)
+            # if is_repeat_user:
+            #     # 更新用户信息,使用异步方法并等待结果
+            #     self.LocalLog.info(f"用户{account_id}已经存在数据库中")
+            #     self.db_ops.update_user(account_id, account_name, account_avatar)
+            # else:
+            #     self.LocalLog.info(f"用户{account_id}没在数据库中")
+            #     # 插入用户信息,使用异步方法并等待结果
+            #     self.db_ops.insert_user(account_id, account_name, account_avatar)
+            #     self.redis_ops.add_user_data("task:zqkd_user_id", json.dumps({"uid": account_id}))
+            #     self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
+            #     self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
 
             if video_duration > self.rule_dict.get("duration", {}).get("max",
                                                                        1200) or video_duration < self.rule_dict.get(
@@ -266,14 +267,14 @@ class ZhongQingKanDianAuthor:
 
             item.add_video_info("video_id", video_obj['channel_content_id'])
             item.add_video_info("video_title", video_obj["title"])
-            item.add_video_info("play_cnt", int(video_obj["read_num"]))
+            item.add_video_info("play_cnt", video_obj["read_num"])
             item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
             item.add_video_info("out_user_id", video_obj["channel_account_id"])
             item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
             item.add_video_info("like_cnt", 0)
-            item.add_video_info("collection_cnt", int(video_obj['collect_num']))
-            item.add_video_info("share_cnt", int(video_obj["share_num"]))
-            item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+            item.add_video_info("collection_cnt", 0)
+            item.add_video_info("share_cnt", 0)
+            item.add_video_info("comment_cnt", 0)
             item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
             item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
             item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
@@ -340,7 +341,7 @@ class ZhongQingKanDianAuthor:
 
         :return: 无返回值,程序运行的主逻辑
         """
-        self.LocalLog.info("开始执行中青看点推荐抓取...")
+        self.LocalLog.info("开始执行中青看点用户视频抓取...")
         await asyncio.gather(
             self.control_request_author()
         )
@@ -351,5 +352,5 @@ if __name__ == '__main__':
         platform="zhongqingkandian",
         mode="author",
         rule_dict={"videos_cnt": {"min": 2, "max": 0}},
-        user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}]
+        user_list=[{"uid": 81525568, "link": "中青看点推荐", "nick_name": "芸芸众生"}]
     ).run())