zqkd_db_redis.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import os
  2. import sys
  3. import threading
  4. from datetime import datetime, timedelta
  5. import redis
  6. sys.path.append(os.getcwd())
  7. from application.common.mysql import MysqlHelper
  8. class DatabaseOperations:
  9. def __init__(self, mode, platform):
  10. self.mysql = MysqlHelper(mode=mode, platform=platform)
  11. def check_user_id(self, uid):
  12. """
  13. 检查指定用户ID是否存在于数据库的zqkd_uid表中。
  14. :param uid:要检查的用户ID
  15. :return:如果用户ID存在于表中返回True,否则返回False
  16. """
  17. query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
  18. result = self.mysql.select(sql=query_sql)
  19. return bool(result)
  20. def update_user(self, uid, user_name, avatar_url):
  21. """
  22. 更新数据库中指定用户的用户名和头像URL。
  23. :param uid:要更新信息的用户ID
  24. :param user_name:新的用户名
  25. :param avatar_url:新的头像URL
  26. :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
  27. """
  28. update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
  29. return self.mysql.update(sql=update_sql)
  30. def insert_user(self, uid, user_name, avatar_url):
  31. """
  32. 向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
  33. :param uid:新用户的ID
  34. :param user_name:新用户的用户名
  35. :param avatar_url:新用户的头像URL
  36. :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
  37. """
  38. current_time = datetime.now()
  39. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  40. insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
  41. return self.mysql.update(sql=insert_sql)
  42. def select_user(self,last_scanned_id=0):
  43. # 构建查询(根据last_scanned_id过滤)
  44. query = "SELECT id, uid FROM zqkd_uid"
  45. if last_scanned_id > 0:
  46. query += f" WHERE id > {last_scanned_id}"
  47. query += " ORDER BY id ASC"
  48. return self.mysql.select(query)
  49. class RedisOperations:
  50. _pool: redis.ConnectionPool = None
  51. _instance = None
  52. _lock = threading.Lock() # 用于线程安全的单例创建
  53. @classmethod
  54. def get_instance(cls):
  55. """线程安全的单例获取方法"""
  56. if not cls._instance:
  57. with cls._lock:
  58. if not cls._instance:
  59. cls._instance = cls()
  60. return cls._instance
  61. def __init__(self):
  62. # 私有构造函数,使用 get_instance() 获取实例
  63. if RedisOperations._instance is not None:
  64. raise Exception("请使用 get_instance() 获取实例")
  65. self._pool = self._get_pool()
  66. self.client = redis.Redis(connection_pool=self._pool,decode_responses=True) # 复用同一个客户端
  67. def _get_pool(self) -> redis.ConnectionPool:
  68. if self._pool is None:
  69. self._pool = redis.ConnectionPool(
  70. host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
  71. port=6379,
  72. db=0,
  73. password="Wqsd@2019",
  74. max_connections=50, # 增加最大连接数
  75. socket_timeout=10,
  76. retry_on_timeout=True
  77. )
  78. return self._pool
  79. def close(self):
  80. """关闭连接池"""
  81. if self._pool:
  82. self._pool.disconnect(inuse_connections=True)
  83. def get_recommend_video(self, task="task:zqkd_video_id"):
  84. """从Redis的指定列表中弹出并返回最左边的视频ID"""
  85. try:
  86. value_bytes = self.client.rpop(task)
  87. if value_bytes: # 检查是否为空(列表可能已空)
  88. value_str = value_bytes.decode('utf-8')
  89. return value_str
  90. except Exception as e:
  91. print("e")
  92. return None
  93. # def add_user_data(self, task, key):
  94. # """将用户数据添加到Redis的指定列表"""
  95. # try:
  96. # self.logger.info(f"添加用户{key}到任务{task}")
  97. # self.client.rpush(task, key)
  98. # self.logger.info(f"用户数据写入Redis成功,数据: {key}")
  99. # except Exception as e:
  100. # self.logger.error(f"写入用户数据到Redis时出现异常: {e}")
  101. def check_video_id_exists(self, videoID):
  102. """检查指定的视频ID是否已经存在于Redis中"""
  103. key = f"crawler:zqkd:{videoID}"
  104. try:
  105. return self.client.exists(key)
  106. except Exception as e:
  107. return False
  108. def save_video_id(self, videoID):
  109. """将视频ID存储到Redis中,并为其设置7天的过期时间"""
  110. key = f"crawler:zqkd:{videoID}"
  111. try:
  112. expiration_time = int(timedelta(days=7).total_seconds())
  113. self.client.setex(key, expiration_time, "1")
  114. except Exception as e:
  115. print(f"保存视频ID {videoID}到Redis并设置过期时间时出现异常: {e}")
  116. def save_recommend_video(self, videoID):
  117. """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
  118. task = "task:zqkd_video_id"
  119. try:
  120. pipe = self.client.pipeline() # 使用管道执行多个命令
  121. pipe.rpush(task, videoID)
  122. pipe.expire(task, int(timedelta(days=2).total_seconds()))
  123. pipe.execute()
  124. # 检查数据是否写入成功
  125. list_length = self.client.llen(task)
  126. except Exception as e:
  127. print(f"保存推荐视频 ID {videoID} 到 Redis 列表并设置过期时间时出现异常: {e}")
  128. def get_last_scanned_id(self):
  129. return self.client.get("zqkd_last_scanned_id")
  130. def set_last_scanned_id(self,last_scanned_id):
  131. return self.client.set("zqkd_last_scanned_id",last_scanned_id)
  132. if __name__ == '__main__':
  133. db = DatabaseOperations("12","123")
  134. user = db.select_user()
  135. print(user)