zqkd_db_redis.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import os
  2. import sys
  3. import threading
  4. import traceback
  5. from datetime import datetime, timedelta
  6. import redis
  7. from application.common import Local
  8. sys.path.append(os.getcwd())
  9. from application.common.mysql import MysqlHelper
  10. class DatabaseOperations:
  11. def __init__(self, mode, platform):
  12. self.mysql = MysqlHelper(mode=mode, platform=platform)
  13. self.LocalLog = Local.logger(platform, mode)
  14. self.mode = mode
  15. self.platform = platform
  16. def check_user_id(self, uid):
  17. """
  18. 检查指定用户ID是否存在于数据库的zqkd_uid表中。
  19. :param uid:要检查的用户ID
  20. :return:如果用户ID存在于表中返回True,否则返回False
  21. """
  22. try:
  23. query_sql = f""" SELECT uid FROM zqkd_user WHERE uid = "{uid}"; """
  24. result = self.mysql.select(sql=query_sql)
  25. return bool(result)
  26. except Exception as e:
  27. tb = traceback.format_exc()
  28. self.LocalLog.error(f"检查用户ID失败: {e}\n{tb}")
  29. return False
  30. def update_user(self, uid, user_name, avatar_url):
  31. """
  32. 更新数据库中指定用户的用户名和头像URL。
  33. :param uid:要更新信息的用户ID
  34. :param user_name:新的用户名
  35. :param avatar_url:新的头像URL
  36. :return:如果更新操作成功,返回更新操作的结果(通常是影响的行数),失败则返回None或抛出异常
  37. """
  38. try:
  39. update_sql = f""" UPDATE zqkd_user SET avatar_url = "{avatar_url}", user_name = "{user_name}" WHERE uid = "{uid}"; """
  40. return self.mysql.update(sql=update_sql)
  41. except Exception as e:
  42. tb = traceback.format_exc()
  43. self.LocalLog.error(f"更新用户信息失败: {e}\n{tb}")
  44. return None
  45. def insert_user(self, uid, user_name, avatar_url):
  46. """
  47. 向数据库的zqkd_user表中插入或更新用户信息
  48. :param uid: 用户ID(数值类型)
  49. :param user_name: 用户名
  50. :param avatar_url: 头像URL
  51. :return: 成功返回影响的行数,失败返回None
  52. """
  53. try:
  54. # 直接拼接SQL(不推荐,有SQL注入风险)
  55. insert_sql = f"""
  56. INSERT INTO zqkd_user (uid, avatar_url, user_name)
  57. VALUES ({uid}, '{avatar_url.replace("'", "''")}', '{user_name.replace("'", "''")}')
  58. ON DUPLICATE KEY UPDATE
  59. user_name = '{user_name.replace("'", "''")}',
  60. avatar_url = '{avatar_url.replace("'", "''")}'
  61. """
  62. return self.mysql.update(sql=insert_sql)
  63. except Exception as e:
  64. tb = traceback.format_exc()
  65. self.LocalLog.error(f"插入用户信息失败: {e}\n{tb}")
  66. return None
  67. def get_today_videos(self):
  68. try:
  69. # 手动转义单引号(仅缓解部分风险)
  70. sql = """
  71. SELECT count(*) as cnt
  72. FROM crawler_video
  73. WHERE create_time >= CURDATE()
  74. AND create_time < CURDATE() + INTERVAL 1 DAY
  75. AND platform = %s
  76. AND strategy = %s
  77. """
  78. result = self.mysql.select_params(sql, (self.platform,self.mode))
  79. if result and len(result) > 0:
  80. return result[0][0] # 返回第一行第一列的计数值
  81. return 0 # 无结果时返回0
  82. except Exception as e:
  83. self.LocalLog.error(f"查询失败: {e}")
  84. return 0
  85. def select_user(self, last_scanned_id=0):
  86. """
  87. 根据last_scanned_id查询用户数据
  88. :param last_scanned_id: 上次扫描的ID,0表示从头开始
  89. :return: 查询结果列表
  90. """
  91. try:
  92. # 构建查询(根据last_scanned_id过滤)
  93. query = "SELECT id, uid FROM zqkd_user"
  94. if last_scanned_id > 0:
  95. query += f" WHERE id > {last_scanned_id}"
  96. query += " ORDER BY id ASC"
  97. return self.mysql.select(query)
  98. except Exception as e:
  99. tb = traceback.format_exc()
  100. self.LocalLog.error(f"查询用户列表失败: {e}\n{tb}")
  101. return []
  102. class RedisOperations:
  103. _pool: redis.ConnectionPool = None
  104. _instance = None
  105. _lock = threading.Lock() # 用于线程安全的单例创建
  106. @classmethod
  107. def get_instance(cls, mode="", platform=""):
  108. """线程安全的单例获取方法"""
  109. if not cls._instance:
  110. with cls._lock:
  111. if not cls._instance:
  112. cls._instance = cls(mode, platform)
  113. return cls._instance
  114. def __init__(self, mode, platform):
  115. # 私有构造函数,使用 get_instance() 获取实例
  116. self.mode = mode
  117. self.platform = platform
  118. self.LocalLog = Local.logger(self.platform, self.mode)
  119. if RedisOperations._instance is not None:
  120. raise Exception("请使用 get_instance() 获取实例")
  121. self._pool = self._get_pool()
  122. self.client = redis.Redis(connection_pool=self._pool, decode_responses=True) # 复用同一个客户端
  123. def _get_pool(self) -> redis.ConnectionPool:
  124. if self._pool is None:
  125. try:
  126. self._pool = redis.ConnectionPool(
  127. host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",
  128. port=6379,
  129. db=0,
  130. password="Wqsd@2019",
  131. max_connections=50, # 增加最大连接数
  132. socket_timeout=10,
  133. retry_on_timeout=True
  134. )
  135. except Exception as e:
  136. tb = traceback.format_exc()
  137. self.LocalLog.error(f"创建Redis连接池失败: {e}\n{tb}")
  138. raise
  139. return self._pool
  140. def close(self):
  141. """关闭连接池"""
  142. try:
  143. if self._pool:
  144. self._pool.disconnect(inuse_connections=True)
  145. except Exception as e:
  146. tb = traceback.format_exc()
  147. self.LocalLog.error(f"关闭Redis连接池失败: {e}\n{tb}")
  148. def get_recommend_video(self, task="task:zqkd_video_id"):
  149. """从Redis的指定列表中弹出并返回最左边的视频ID"""
  150. try:
  151. value_bytes = self.client.rpop(task)
  152. value_str = value_bytes.decode('utf-8')
  153. return value_str
  154. except Exception as e:
  155. tb = traceback.format_exc()
  156. self.LocalLog.error(f"获取推荐视频ID失败: {e}\n{tb}")
  157. return None
  158. def check_video_id_exists(self, videoID):
  159. """检查指定的视频ID是否已经存在于Redis中"""
  160. try:
  161. key = f"crawler:zqkd:{videoID}"
  162. return self.client.exists(key)
  163. except Exception as e:
  164. tb = traceback.format_exc()
  165. self.LocalLog.error(f"检查视频ID是否存在失败: {e}\n{tb}")
  166. return False
  167. def save_video_id(self, videoID):
  168. """将视频ID存储到Redis中,并为其设置7天的过期时间"""
  169. try:
  170. key = f"crawler:zqkd:{videoID}"
  171. expiration_time = int(timedelta(days=7).total_seconds())
  172. self.client.setex(key, expiration_time, "1")
  173. except Exception as e:
  174. tb = traceback.format_exc()
  175. self.LocalLog.error(f"保存视频ID失败: {e}\n{tb}")
  176. def save_recommend_video(self, videoID):
  177. """将推荐视频ID添加到Redis的指定列表中,并为该列表设置2天的过期时间"""
  178. try:
  179. task = "task:zqkd_video_id"
  180. pipe = self.client.pipeline() # 使用管道执行多个命令
  181. pipe.rpush(task, videoID)
  182. pipe.expire(task, int(timedelta(days=2).total_seconds()))
  183. pipe.execute()
  184. # 检查数据是否写入成功
  185. list_length = self.client.llen(task)
  186. self.LocalLog.info(f"保存推荐视频ID成功,列表长度: {list_length}")
  187. except Exception as e:
  188. tb = traceback.format_exc()
  189. self.LocalLog.error(f"保存推荐视频ID失败: {e}\n{tb}")
  190. def get_last_scanned_id(self):
  191. """获取上次扫描的ID"""
  192. try:
  193. return self.client.get("zqkd_last_scanned_id").decode('utf-8')
  194. except Exception as e:
  195. tb = traceback.format_exc()
  196. self.LocalLog.error(f"获取上次扫描的ID失败: {e}\n{tb}")
  197. return None
  198. def set_last_scanned_id(self, last_scanned_id):
  199. """设置上次扫描的ID"""
  200. try:
  201. result = self.client.set("zqkd_last_scanned_id", last_scanned_id)
  202. if result:
  203. self.LocalLog.info(f"成功设置上次扫描的ID: {last_scanned_id}")
  204. except Exception as e:
  205. tb = traceback.format_exc()
  206. self.LocalLog.error(f"设置上次扫描的ID失败: {e}\n{tb}")
  207. return False
  208. if __name__ == '__main__':
  209. db = DatabaseOperations("recommend", "zhongqingkandian")
  210. print(db.get_today_videos())