import os import sys import threading import traceback from datetime import datetime, timedelta import redis from application.common import Local sys.path.append(os.getcwd()) from application.common.mysql import MysqlHelper class DatabaseOperations: def __init__(self, mode, platform): self.mysql = MysqlHelper(mode=mode, platform=platform) self.LocalLog = Local.logger(platform, mode) def check_user_id(self, uid): """ 检查指定用户ID是否存在于数据库的zqkd_uid表中。 :param uid:要检查的用户ID :return:如果用户ID存在于表中返回True,否则返回False """ try: query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """ result = self.mysql.select(sql=query_sql) return bool(result) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}") return False def update_user(self, uid, user_name, avatar_url): """ 更新数据库中指定用户的用户名和头像URL。 :param uid:要更新信息的用户ID :param user_name:新的用户名 :param avatar_url:新的头像URL :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常 """ try: update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """ return self.mysql.update(sql=update_sql) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}") return None def insert_user(self, uid, user_name, avatar_url): """ 向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。 :param uid:新用户的ID :param user_name:新用户的用户名 :param avatar_url:新用户的头像URL :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常 """ try: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """ return self.mysql.update(sql=insert_sql) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}") return None def select_user(self, last_scanned_id=0): """ 根据last_scanned_id分页查询用户数据 :param last_scanned_id: 上次扫描的ID,0表示从头开始 :return: 查询结果列表 """ try: # 构建查询(根据last_scanned_id过滤) query = "SELECT id, uid FROM zqkd_uid" if last_scanned_id > 0: query += f" WHERE id > {last_scanned_id}" query += " ORDER BY id ASC" return self.mysql.select(query) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}") return [] class RedisOperations: _pool: redis.ConnectionPool = None _instance = None _lock = threading.Lock() # 用于线程安全的单例创建 @classmethod def get_instance(cls, mode="", platform=""): """线程安全的单例获取方法""" if not cls._instance: with cls._lock: if not cls._instance: cls._instance = cls(mode, platform) return cls._instance def __init__(self, mode, platform): # 私有构造函数,使用 get_instance() 获取实例 self.mode = mode self.platform = platform self.LocalLog = Local.logger(self.platform, self.mode) if RedisOperations._instance is not None: raise Exception("请使用 get_instance() 获取实例") self._pool = self._get_pool() self.client = redis.Redis(connection_pool=self._pool, decode_responses=True) # 复用同一个客户端 def _get_pool(self) -> redis.ConnectionPool: if self._pool is None: try: self._pool = redis.ConnectionPool( host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", port=6379, db=0, password="Wqsd@2019", max_connections=50, # 增加最大连接数 socket_timeout=10, retry_on_timeout=True ) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}") raise return self._pool def close(self): """关闭连接池""" try: if self._pool: self._pool.disconnect(inuse_connections=True) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}") def get_recommend_video(self, task="task:zqkd_video_id"): """从Redis的指定列表中弹出并返回最左边的视频ID""" try: value_bytes = self.client.rpop(task) value_str = value_bytes.decode('utf-8') return value_str except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}") return None def check_video_id_exists(self, videoID): """检查指定的视频ID是否已经存在于Redis中""" try: key = f"crawler:zqkd:{videoID}" return self.client.exists(key) except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}") return False def save_video_id(self, videoID): """将视频ID存储到Redis中,并为其设置7天的过期时间""" try: key = f"crawler:zqkd:{videoID}" expiration_time = int(timedelta(days=7).total_seconds()) self.client.setex(key, expiration_time, "1") except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}") def save_recommend_video(self, videoID): """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间""" try: task = "task:zqkd_video_id" pipe = self.client.pipeline() # 使用管道执行多个命令 pipe.rpush(task, videoID) pipe.expire(task, int(timedelta(days=2).total_seconds())) pipe.execute() # 检查数据是否写入成功 list_length = self.client.llen(task) self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}") except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}") def get_last_scanned_id(self): """获取上次扫描的ID""" try: return self.client.get("zqkd_last_scanned_id").decode('utf-8') except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}") return None def set_last_scanned_id(self, last_scanned_id): """设置上次扫描的ID""" try: result = self.client.set("zqkd_last_scanned_id", last_scanned_id) if result: self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}") except Exception as e: tb = traceback.format_exc() self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}") return False if __name__ == '__main__': db = DatabaseOperations("12", "123") user = db.select_user(10000) print(user)