123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import os
- import sys
- import threading
- from datetime import datetime, timedelta
- import redis
- sys.path.append(os.getcwd())
- from application.common.mysql import MysqlHelper
- class DatabaseOperations:
- def __init__(self, mode, platform):
- self.mysql = MysqlHelper(mode=mode, platform=platform)
- def check_user_id(self, uid):
- """
- 检查指定用户ID是否存在于数据库的zqkd_uid表中。
- :param uid:要检查的用户ID
- :return:如果用户ID存在于表中返回True,否则返回False
- """
- query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
- result = self.mysql.select(sql=query_sql)
- return bool(result)
- def update_user(self, uid, user_name, avatar_url):
- """
- 更新数据库中指定用户的用户名和头像URL。
- :param uid:要更新信息的用户ID
- :param user_name:新的用户名
- :param avatar_url:新的头像URL
- :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
- """
- update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
- return self.mysql.update(sql=update_sql)
- def insert_user(self, uid, user_name, avatar_url):
- """
- 向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
- :param uid:新用户的ID
- :param user_name:新用户的用户名
- :param avatar_url:新用户的头像URL
- :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
- """
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
- return self.mysql.update(sql=insert_sql)
- def select_user(self,last_scanned_id=0):
- # 构建查询(根据last_scanned_id过滤)
- query = "SELECT id, uid FROM zqkd_uid"
- if last_scanned_id > 0:
- query += f" WHERE id > {last_scanned_id}"
- query += " ORDER BY id ASC"
- return self.mysql.select(query)
- class RedisOperations:
- _pool: redis.ConnectionPool = None
- _instance = None
- _lock = threading.Lock() # 用于线程安全的单例创建
- @classmethod
- def get_instance(cls):
- """线程安全的单例获取方法"""
- if not cls._instance:
- with cls._lock:
- if not cls._instance:
- cls._instance = cls()
- return cls._instance
- def __init__(self):
- # 私有构造函数,使用 get_instance() 获取实例
- if RedisOperations._instance is not None:
- raise Exception("请使用 get_instance() 获取实例")
- self._pool = self._get_pool()
- self.client = redis.Redis(connection_pool=self._pool,decode_responses=True) # 复用同一个客户端
- def _get_pool(self) -> redis.ConnectionPool:
- if self._pool is None:
- self._pool = redis.ConnectionPool(
- host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
- port=6379,
- db=0,
- password="Wqsd@2019",
- max_connections=50, # 增加最大连接数
- socket_timeout=10,
- retry_on_timeout=True
- )
- return self._pool
- def close(self):
- """关闭连接池"""
- if self._pool:
- self._pool.disconnect(inuse_connections=True)
- def get_recommend_video(self, task="task:zqkd_video_id"):
- """从Redis的指定列表中弹出并返回最左边的视频ID"""
- try:
- value_bytes = self.client.rpop(task)
- if value_bytes: # 检查是否为空(列表可能已空)
- value_str = value_bytes.decode('utf-8')
- return value_str
- except Exception as e:
- print("e")
- return None
- # def add_user_data(self, task, key):
- # """将用户数据添加到Redis的指定列表"""
- # try:
- # self.logger.info(f"添加用户{key}到任务{task}")
- # self.client.rpush(task, key)
- # self.logger.info(f"用户数据写入Redis成功,数据: {key}")
- # except Exception as e:
- # self.logger.error(f"写入用户数据到Redis时出现异常: {e}")
- def check_video_id_exists(self, videoID):
- """检查指定的视频ID是否已经存在于Redis中"""
- key = f"crawler:zqkd:{videoID}"
- try:
- return self.client.exists(key)
- except Exception as e:
- return False
- def save_video_id(self, videoID):
- """将视频ID存储到Redis中,并为其设置7天的过期时间"""
- key = f"crawler:zqkd:{videoID}"
- try:
- expiration_time = int(timedelta(days=7).total_seconds())
- self.client.setex(key, expiration_time, "1")
- except Exception as e:
- print(f"保存视频ID {videoID}到Redis并设置过期时间时出现异常: {e}")
- def save_recommend_video(self, videoID):
- """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
- task = "task:zqkd_video_id"
- try:
- pipe = self.client.pipeline() # 使用管道执行多个命令
- pipe.rpush(task, videoID)
- pipe.expire(task, int(timedelta(days=2).total_seconds()))
- pipe.execute()
- # 检查数据是否写入成功
- list_length = self.client.llen(task)
- except Exception as e:
- print(f"保存推荐视频 ID {videoID} 到 Redis 列表并设置过期时间时出现异常: {e}")
- def get_last_scanned_id(self):
- return self.client.get("zqkd_last_scanned_id")
- def set_last_scanned_id(self,last_scanned_id):
- return self.client.set("zqkd_last_scanned_id",last_scanned_id)
- if __name__ == '__main__':
- db = DatabaseOperations("12","123")
- user = db.select_user()
- print(user)
|