123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- import os
- import sys
- import threading
- import traceback
- from datetime import datetime, timedelta
- import redis
- from application.common import Local
- sys.path.append(os.getcwd())
- from application.common.mysql import MysqlHelper
- class DatabaseOperations:
- def __init__(self, mode, platform):
- self.mysql = MysqlHelper(mode=mode, platform=platform)
- self.LocalLog = Local.logger(platform, mode)
- def check_user_id(self, uid):
- """
- 检查指定用户ID是否存在于数据库的zqkd_uid表中。
- :param uid:要检查的用户ID
- :return:如果用户ID存在于表中返回True,否则返回False
- """
- try:
- query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
- result = self.mysql.select(sql=query_sql)
- return bool(result)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
- return False
- def update_user(self, uid, user_name, avatar_url):
- """
- 更新数据库中指定用户的用户名和头像URL。
- :param uid:要更新信息的用户ID
- :param user_name:新的用户名
- :param avatar_url:新的头像URL
- :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
- """
- try:
- update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
- return self.mysql.update(sql=update_sql)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}")
- return None
- def insert_user(self, uid, user_name, avatar_url):
- """
- 向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
- :param uid:新用户的ID
- :param user_name:新用户的用户名
- :param avatar_url:新用户的头像URL
- :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
- """
- try:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
- return self.mysql.update(sql=insert_sql)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")
- return None
- def select_user(self, last_scanned_id=0):
- """
- 根据last_scanned_id分页查询用户数据
- :param last_scanned_id: 上次扫描的ID,0表示从头开始
- :return: 查询结果列表
- """
- try:
- # 构建查询(根据last_scanned_id过滤)
- query = "SELECT id, uid FROM zqkd_uid"
- if last_scanned_id > 0:
- query += f" WHERE id > {last_scanned_id}"
- query += " ORDER BY id ASC"
- return self.mysql.select(query)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}")
- return []
- class RedisOperations:
- _pool: redis.ConnectionPool = None
- _instance = None
- _lock = threading.Lock() # 用于线程安全的单例创建
- @classmethod
- def get_instance(cls, mode="", platform=""):
- """线程安全的单例获取方法"""
- if not cls._instance:
- with cls._lock:
- if not cls._instance:
- cls._instance = cls(mode, platform)
- return cls._instance
- def __init__(self, mode, platform):
- # 私有构造函数,使用 get_instance() 获取实例
- self.mode = mode
- self.platform = platform
- self.LocalLog = Local.logger(self.platform, self.mode)
- if RedisOperations._instance is not None:
- raise Exception("请使用 get_instance() 获取实例")
- self._pool = self._get_pool()
- self.client = redis.Redis(connection_pool=self._pool, decode_responses=True) # 复用同一个客户端
- def _get_pool(self) -> redis.ConnectionPool:
- if self._pool is None:
- try:
- self._pool = redis.ConnectionPool(
- host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
- port=6379,
- db=0,
- password="Wqsd@2019",
- max_connections=50, # 增加最大连接数
- socket_timeout=10,
- retry_on_timeout=True
- )
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}")
- raise
- return self._pool
- def close(self):
- """关闭连接池"""
- try:
- if self._pool:
- self._pool.disconnect(inuse_connections=True)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}")
- def get_recommend_video(self, task="task:zqkd_video_id"):
- """从Redis的指定列表中弹出并返回最左边的视频ID"""
- try:
- value_bytes = self.client.rpop(task)
- value_str = value_bytes.decode('utf-8')
- return value_str
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}")
- return None
- def check_video_id_exists(self, videoID):
- """检查指定的视频ID是否已经存在于Redis中"""
- try:
- key = f"crawler:zqkd:{videoID}"
- return self.client.exists(key)
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}")
- return False
- def save_video_id(self, videoID):
- """将视频ID存储到Redis中,并为其设置7天的过期时间"""
- try:
- key = f"crawler:zqkd:{videoID}"
- expiration_time = int(timedelta(days=7).total_seconds())
- self.client.setex(key, expiration_time, "1")
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}")
- def save_recommend_video(self, videoID):
- """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
- try:
- task = "task:zqkd_video_id"
- pipe = self.client.pipeline() # 使用管道执行多个命令
- pipe.rpush(task, videoID)
- pipe.expire(task, int(timedelta(days=2).total_seconds()))
- pipe.execute()
- # 检查数据是否写入成功
- list_length = self.client.llen(task)
- self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}")
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}")
- def get_last_scanned_id(self):
- """获取上次扫描的ID"""
- try:
- return self.client.get("zqkd_last_scanned_id").decode('utf-8')
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}")
- return None
- def set_last_scanned_id(self, last_scanned_id):
- """设置上次扫描的ID"""
- try:
- result = self.client.set("zqkd_last_scanned_id", last_scanned_id)
- if result:
- self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")
- except Exception as e:
- tb = traceback.format_exc()
- self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")
- return False
- if __name__ == '__main__':
- db = DatabaseOperations("12", "123")
- user = db.select_user(10000)
- print(user)
|