zqkd_db_redis.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import os
  2. import sys
  3. import threading
  4. import traceback
  5. from datetime import datetime, timedelta
  6. import redis
  7. from application.common import Local
  8. sys.path.append(os.getcwd())
  9. from application.common.mysql import MysqlHelper
  10. class DatabaseOperations:
  11. def __init__(self, mode, platform):
  12. self.mysql = MysqlHelper(mode=mode, platform=platform)
  13. self.LocalLog = Local.logger(platform, mode)
  14. def check_user_id(self, uid):
  15. """
  16. 检查指定用户ID是否存在于数据库的zqkd_uid表中。
  17. :param uid:要检查的用户ID
  18. :return:如果用户ID存在于表中返回True,否则返回False
  19. """
  20. try:
  21. query_sql = f""" SELECT uid FROM zqkd_uid WHERE uid = "{uid}"; """
  22. result = self.mysql.select(sql=query_sql)
  23. return bool(result)
  24. except Exception as e:
  25. tb = traceback.format_exc()
  26. self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
  27. return False
  28. def update_user(self, uid, user_name, avatar_url):
  29. """
  30. 更新数据库中指定用户的用户名和头像URL。
  31. :param uid:要更新信息的用户ID
  32. :param user_name:新的用户名
  33. :param avatar_url:新的头像URL
  34. :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
  35. """
  36. try:
  37. update_sql = f""" UPDATE zqkd_uid SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
  38. return self.mysql.update(sql=update_sql)
  39. except Exception as e:
  40. tb = traceback.format_exc()
  41. self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}")
  42. return None
  43. def insert_user(self, uid, user_name, avatar_url):
  44. """
  45. 向数据库的zqkd_uid表中插入新的用户信息,包含用户ID、用户名、头像URL和当前时间。
  46. :param uid:新用户的ID
  47. :param user_name:新用户的用户名
  48. :param avatar_url:新用户的头像URL
  49. :return:如果插入操作成功,返回插入操作的结果(通常是影响的行数),失败则返回None或抛出异常
  50. """
  51. try:
  52. current_time = datetime.now()
  53. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  54. insert_sql = f""" INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) VALUES ('{uid}', '{avatar_url}', '{user_name}', '{formatted_time}'); """
  55. return self.mysql.update(sql=insert_sql)
  56. except Exception as e:
  57. tb = traceback.format_exc()
  58. self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")
  59. return None
  60. def select_user(self, last_scanned_id=0):
  61. """
  62. 根据last_scanned_id分页查询用户数据
  63. :param last_scanned_id: 上次扫描的ID,0表示从头开始
  64. :return: 查询结果列表
  65. """
  66. try:
  67. # 构建查询(根据last_scanned_id过滤)
  68. query = "SELECT id, uid FROM zqkd_uid"
  69. if last_scanned_id > 0:
  70. query += f" WHERE id > {last_scanned_id}"
  71. query += " ORDER BY id ASC"
  72. return self.mysql.select(query)
  73. except Exception as e:
  74. tb = traceback.format_exc()
  75. self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}")
  76. return []
  77. class RedisOperations:
  78. _pool: redis.ConnectionPool = None
  79. _instance = None
  80. _lock = threading.Lock() # 用于线程安全的单例创建
  81. @classmethod
  82. def get_instance(cls, mode="", platform=""):
  83. """线程安全的单例获取方法"""
  84. if not cls._instance:
  85. with cls._lock:
  86. if not cls._instance:
  87. cls._instance = cls(mode, platform)
  88. return cls._instance
  89. def __init__(self, mode, platform):
  90. # 私有构造函数,使用 get_instance() 获取实例
  91. self.mode = mode
  92. self.platform = platform
  93. self.LocalLog = Local.logger(self.platform, self.mode)
  94. if RedisOperations._instance is not None:
  95. raise Exception("请使用 get_instance() 获取实例")
  96. self._pool = self._get_pool()
  97. self.client = redis.Redis(connection_pool=self._pool, decode_responses=True) # 复用同一个客户端
  98. def _get_pool(self) -> redis.ConnectionPool:
  99. if self._pool is None:
  100. try:
  101. self._pool = redis.ConnectionPool(
  102. host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
  103. port=6379,
  104. db=0,
  105. password="Wqsd@2019",
  106. max_connections=50, # 增加最大连接数
  107. socket_timeout=10,
  108. retry_on_timeout=True
  109. )
  110. except Exception as e:
  111. tb = traceback.format_exc()
  112. self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}")
  113. raise
  114. return self._pool
  115. def close(self):
  116. """关闭连接池"""
  117. try:
  118. if self._pool:
  119. self._pool.disconnect(inuse_connections=True)
  120. except Exception as e:
  121. tb = traceback.format_exc()
  122. self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}")
  123. def get_recommend_video(self, task="task:zqkd_video_id"):
  124. """从Redis的指定列表中弹出并返回最左边的视频ID"""
  125. try:
  126. value_bytes = self.client.rpop(task)
  127. value_str = value_bytes.decode('utf-8')
  128. return value_str
  129. except Exception as e:
  130. tb = traceback.format_exc()
  131. self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}")
  132. return None
  133. def check_video_id_exists(self, videoID):
  134. """检查指定的视频ID是否已经存在于Redis中"""
  135. try:
  136. key = f"crawler:zqkd:{videoID}"
  137. return self.client.exists(key)
  138. except Exception as e:
  139. tb = traceback.format_exc()
  140. self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}")
  141. return False
  142. def save_video_id(self, videoID):
  143. """将视频ID存储到Redis中,并为其设置7天的过期时间"""
  144. try:
  145. key = f"crawler:zqkd:{videoID}"
  146. expiration_time = int(timedelta(days=7).total_seconds())
  147. self.client.setex(key, expiration_time, "1")
  148. except Exception as e:
  149. tb = traceback.format_exc()
  150. self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}")
  151. def save_recommend_video(self, videoID):
  152. """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
  153. try:
  154. task = "task:zqkd_video_id"
  155. pipe = self.client.pipeline() # 使用管道执行多个命令
  156. pipe.rpush(task, videoID)
  157. pipe.expire(task, int(timedelta(days=2).total_seconds()))
  158. pipe.execute()
  159. # 检查数据是否写入成功
  160. list_length = self.client.llen(task)
  161. self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}")
  162. except Exception as e:
  163. tb = traceback.format_exc()
  164. self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}")
  165. def get_last_scanned_id(self):
  166. """获取上次扫描的ID"""
  167. try:
  168. return self.client.get("zqkd_last_scanned_id").decode('utf-8')
  169. except Exception as e:
  170. tb = traceback.format_exc()
  171. self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}")
  172. return None
  173. def set_last_scanned_id(self, last_scanned_id):
  174. """设置上次扫描的ID"""
  175. try:
  176. result = self.client.set("zqkd_last_scanned_id", last_scanned_id)
  177. if result:
  178. self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")
  179. return result
  180. except Exception as e:
  181. tb = traceback.format_exc()
  182. self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")
  183. return False
  184. if __name__ == '__main__':
  185. db = DatabaseOperations("12", "123")
  186. user = db.select_user()
  187. print(user)