db_helper.py 18 KB


  1. import traceback
  2. import time
  3. import redis
  4. # import psycopg2
  5. import pymysql
  6. # from psycopg2 import pool as pgpool
  7. from config import set_config
  8. from log import Log
  9. config_ = set_config()
  10. log_ = Log()
  11. conn_redis = None
  12. conn_filter_redis = None
  13. class RedisHelper(object):
  14. def __init__(self, params=None, redis_info=config_.REDIS_INFO):
  15. """
  16. 初始化redis连接信息
  17. redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
  18. """
  19. self.redis_info = redis_info
  20. self.host = redis_info['host']
  21. self.port = redis_info['port']
  22. self.password = redis_info['password']
  23. self.params = params
  24. def connect(self):
  25. """
  26. 连接redis
  27. :return: conn
  28. """
  29. if self.redis_info == config_.REDIS_INFO_FILTER:
  30. global conn_filter_redis
  31. if conn_filter_redis is None:
  32. pool = redis.ConnectionPool(host=self.host,
  33. port=self.port,
  34. password=self.password,
  35. decode_responses=True)
  36. conn = redis.Redis(connection_pool=pool)
  37. conn_filter_redis = conn
  38. return conn_filter_redis
  39. else:
  40. global conn_redis
  41. if conn_redis is None:
  42. pool = redis.ConnectionPool(host=self.host,
  43. port=self.port,
  44. password=self.password,
  45. decode_responses=True)
  46. conn = redis.Redis(connection_pool=pool)
  47. conn_redis = conn
  48. return conn_redis
  49. def key_exists(self, key_name):
  50. """
  51. 判断key是否存在
  52. :param key_name: key
  53. :return: 存在-True, 不存在-False
  54. """
  55. # start_time = time.time()
  56. conn = self.connect()
  57. res = conn.exists(key_name)
  58. # if self.params is not None:
  59. # log_.info({
  60. # 'logTimestamp': int(time.time() * 1000),
  61. # 'request_id': self.params.request_id,
  62. # 'operation': 'get_data_from_redis',
  63. # 'executeTime': (time.time() - start_time) * 1000
  64. # })
  65. return res
  66. def del_keys(self, key_name):
  67. """
  68. 删除key
  69. :param key_name: key
  70. :return: None
  71. """
  72. conn = self.connect()
  73. conn.delete(key_name)
  74. def get_data_from_redis(self, key_name):
  75. """
  76. 读取redis中的数据
  77. :param key_name: key
  78. :return: data
  79. """
  80. # start_time = time.time()
  81. conn = self.connect()
  82. if not conn.exists(key_name):
  83. # key不存在
  84. return None
  85. data = conn.get(key_name)
  86. # if self.params is not None:
  87. # log_.info({
  88. # 'logTimestamp': int(time.time() * 1000),
  89. # 'request_id': self.params.request_id,
  90. # 'operation': 'get_data_from_redis',
  91. # 'executeTime': (time.time() - start_time) * 1000
  92. # })
  93. return data
  94. def set_data_to_redis(self, key_name, value, expire_time=24*3600):
  95. """
  96. 新增数据
  97. :param key_name: key
  98. :param value: 元素的值 videoId
  99. :param expire_time: 过期时间,单位:s,默认1天
  100. :return: None
  101. """
  102. # start_time = time.time()
  103. conn = self.connect()
  104. conn.set(key_name, value, ex=int(expire_time))
  105. # if self.params is not None:
  106. # log_.info({
  107. # 'logTimestamp': int(time.time() * 1000),
  108. # 'request_id': self.params.request_id,
  109. # 'operation': 'set_data_to_redis',
  110. # 'executeTime': (time.time() - start_time) * 1000})
  111. def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
  112. """
  113. 新增数据,有序set
  114. :param key_name: key
  115. :param data: 元素的值及对应分数 type-dict {value: score}
  116. :param expire_time: 过期时间,单位:s,默认7天,type-int
  117. :return: None
  118. """
  119. # start_time = time.time()
  120. conn = self.connect()
  121. conn.zadd(key_name, data)
  122. # 设置过期时间
  123. conn.expire(key_name, int(expire_time))
  124. # if self.params is not None:
  125. # log_.info({
  126. # 'logTimestamp': int(time.time() * 1000),
  127. # 'request_id': self.params.request_id,
  128. # 'operation': 'add_data_with_zset',
  129. # 'executeTime': (time.time() - start_time) * 1000
  130. # })
  131. def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
  132. """
  133. 根据索引位置获取元素的值
  134. :param key_name: key
  135. :param start: 索引起始点 闭区间,包含start
  136. :param end: 索引结束点 闭区间,包含end
  137. :param desc: 分数排序方式,默认从大到小
  138. :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
  139. :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
  140. """
  141. # start_time = time.time()
  142. conn = self.connect()
  143. if not conn.exists(key_name):
  144. return None
  145. data = conn.zrange(key_name, start, end, desc, with_scores)
  146. if with_scores:
  147. data = data
  148. else:
  149. data = [eval(value) for value in data]
  150. # if self.params is not None:
  151. # log_.info({
  152. # 'logTimestamp': int(time.time() * 1000),
  153. # 'request_id': self.params.request_id,
  154. # 'operation': 'get_data_zset_with_index',
  155. # 'executeTime': (time.time() - start_time) * 1000
  156. # })
  157. return data
  158. def get_all_data_from_zset(self, key_name, desc=True, with_scores=False):
  159. """
  160. 获取zset中所有元素的值
  161. :param key_name: key
  162. :param desc: 分数排序方式,默认从大到小
  163. :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
  164. :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
  165. """
  166. conn = self.connect()
  167. if not conn.exists(key_name):
  168. return None
  169. data = []
  170. start = 0
  171. step = 100
  172. while True:
  173. end = start + step - 1
  174. temp = conn.zrange(key_name, start, end, desc, with_scores)
  175. if not temp:
  176. break
  177. data.extend(temp)
  178. start += step
  179. return data
  180. def get_score_with_value(self, key_name, value):
  181. """
  182. 在zset中,根据元素的value获取对应的score
  183. :param key_name: key
  184. :param value: 元素的值
  185. :return: score value对应的score
  186. """
  187. conn = self.connect()
  188. if not conn.exists(key_name):
  189. return None
  190. return conn.zscore(key_name, value)
  191. def get_rank_with_value(self, key_name, value, desc=False):
  192. """
  193. 在zset中,根据元素的value获取对应排名
  194. :param key_name: key
  195. :param value: 元素的值
  196. :param desc: 是否倒序 type-bool 默认:False-按照score从小到大
  197. :return: rank value对应的rank,从0开始,不存在返回None
  198. """
  199. conn = self.connect()
  200. if not conn.exists(key_name):
  201. return None
  202. if desc is True:
  203. return conn.zrevrank(key_name, value)
  204. else:
  205. return conn.zrank(key_name, value)
  206. def update_score_with_value(self, key_name, value, score, expire_time=24*3600):
  207. """
  208. 在zset中,修改元素value对应的score
  209. :param key_name: key
  210. :param value: 元素的值
  211. :param score: value对应的score更新值
  212. :param expire_time: 过期时间,单位:s,默认1天,type-int
  213. """
  214. conn = self.connect()
  215. if conn.exists(key_name):
  216. conn.zadd(key_name, {value: score})
  217. else:
  218. # key不存在时,需设置过期时间
  219. conn.zadd(key_name, {value: score})
  220. conn.expire(key_name, int(expire_time))
  221. def remove_value_from_zset(self, key_name, value):
  222. """
  223. 删除zset中的指定元素
  224. :param key_name: key
  225. :param value: 元素的值
  226. :return: None
  227. """
  228. # start_time = time.time()
  229. conn = self.connect()
  230. res = conn.zrem(key_name, value)
  231. # if self.params is not None:
  232. # log_.info({
  233. # 'logTimestamp': int(time.time() * 1000),
  234. # 'request_id': self.params.request_id,
  235. # 'operation': 'remove_value_from_zset',
  236. # 'executeTime': (time.time() - start_time) * 1000
  237. # })
  238. return res
  239. def get_index_with_data(self, key_name, value):
  240. """
  241. 根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
  242. :param key_name: key
  243. :param value: 元素的值
  244. :return: idx 位置索引
  245. """
  246. # start_time = time.time()
  247. conn = self.connect()
  248. res = conn.zrevrank(key_name, value)
  249. # if self.params is not None:
  250. # log_.info({
  251. # 'logTimestamp': int(time.time() * 1000),
  252. # 'request_id': self.params.request_id,
  253. # 'operation': 'get_index_with_data',
  254. # 'executeTime': (time.time() - start_time) * 1000
  255. # })
  256. return res
  257. def get_data_from_set(self, key_name):
  258. """
  259. 获取set中的所有数据
  260. :param key_name: key
  261. :return: data
  262. """
  263. # start_time = time.time()
  264. conn = self.connect()
  265. if not conn.exists(key_name):
  266. # key不存在
  267. return None
  268. data = []
  269. cursor = 0
  270. while True:
  271. cur, temp = conn.sscan(key_name, cursor=cursor, count=2000)
  272. data.extend(temp)
  273. if cur == 0:
  274. break
  275. cursor = cur
  276. # if self.params is not None:
  277. # log_.info({
  278. # 'logTimestamp': int(time.time() * 1000),
  279. # 'request_id': self.params.request_id,
  280. # 'operation': 'get_data_from_set',
  281. # 'executeTime': (time.time() - start_time) * 1000
  282. # })
  283. return list(set(data))
  284. def add_data_with_set(self, key_name, values, expire_time=30*60):
  285. """
  286. 新增数据,set
  287. :param key_name: key
  288. :param values: 要添加的元素 类型-tuple
  289. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  290. :return: None
  291. """
  292. # start_time = time.time()
  293. conn = self.connect()
  294. conn.sadd(key_name, *values)
  295. # 设置过期时间
  296. conn.expire(key_name, int(expire_time))
  297. # if self.params is not None:
  298. # log_.info({
  299. # 'logTimestamp': int(time.time() * 1000),
  300. # 'request_id': self.params.request_id,
  301. # 'operation': 'add_data_with_set',
  302. # 'executeTime': (time.time() - start_time) * 1000
  303. # })
  304. def data_exists_with_set(self, key_name, value):
  305. """
  306. 判断元素value是否在集合key_name中
  307. :param key_name: key
  308. :param value: 需判断的元素
  309. :return: 存在-True, 不存在-False
  310. """
  311. # start_time = time.time()
  312. conn = self.connect()
  313. res = conn.sismember(key_name, value)
  314. # if self.params is not None:
  315. # log_.info({
  316. # 'logTimestamp': int(time.time() * 1000),
  317. # 'request_id': self.params.request_id,
  318. # 'operation': 'data_exists_with_set',
  319. # 'executeTime': (time.time() - start_time) * 1000
  320. # })
  321. return res
  322. def get_data_with_count_from_set(self, key_name, count=1):
  323. """
  324. 从set中随机获取元素,并放回
  325. :param key_name: key
  326. :param count: 获取个数, 默认为1
  327. :return:
  328. """
  329. conn = self.connect()
  330. data = conn.srandmember(name=key_name, number=count)
  331. return data
  332. def remove_value_from_set(self, key_name, values):
  333. """
  334. 删除set中的指定元素
  335. :param key_name: key
  336. :param values: 元素的值, 类型-tuple
  337. :return: None
  338. """
  339. # start_time = time.time()
  340. conn = self.connect()
  341. conn.srem(key_name, *values)
  342. # if self.params is not None:
  343. # log_.info({
  344. # 'logTimestamp': int(time.time() * 1000),
  345. # 'request_id': self.params.request_id,
  346. # 'operation': 'remove_value_from_set',
  347. # 'executeTime': (time.time() - start_time) * 1000
  348. # })
  349. def decr_key(self, key_name, amount=1, expire_time=30*60):
  350. """
  351. redis自减
  352. :param key_name: key
  353. :param amount: 自减数,默认为1,type-int
  354. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  355. :return: None
  356. """
  357. # start_time = time.time()
  358. conn = self.connect()
  359. conn.decr(name=key_name, amount=amount)
  360. conn.expire(key_name, int(expire_time))
  361. # if self.params is not None:
  362. # log_.info({
  363. # 'logTimestamp': int(time.time() * 1000),
  364. # 'request_id': self.params.request_id,
  365. # 'operation': 'decr_key',
  366. # 'executeTime': (time.time() - start_time) * 1000
  367. # })
  368. def incr_key(self, key_name, amount=1, expire_time=30*60):
  369. """
  370. redis自增
  371. :param key_name: key
  372. :param amount: 自增数,默认为1,type-int
  373. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  374. :return: None
  375. """
  376. # start_time = time.time()
  377. conn = self.connect()
  378. conn.incr(name=key_name, amount=amount)
  379. conn.expire(key_name, int(expire_time))
  380. # if self.params is not None:
  381. # log_.info({
  382. # 'logTimestamp': int(time.time() * 1000),
  383. # 'request_id': self.params.request_id,
  384. # 'operation': 'incr_key',
  385. # 'executeTime': (time.time() - start_time) * 1000
  386. # })
  387. def setnx_key(self, key_name, value, expire_time=5*60):
  388. """
  389. 当key不存在时,将value塞入key中,key存在时不做操作
  390. :param key_name: key
  391. :param value: value
  392. :return: 过期时间,单位:s,默认5分钟 type-int
  393. """
  394. # start_time = time.time()
  395. conn = self.connect()
  396. conn.setnx(name=key_name, value=value)
  397. conn.expire(name=key_name, time=int(expire_time))
  398. # if self.params is not None:
  399. # log_.info({
  400. # 'logTimestamp': int(time.time() * 1000),
  401. # 'request_id': self.params.request_id,
  402. # 'operation': 'setnx_key',
  403. # 'executeTime': (time.time() - start_time) * 1000
  404. # })
  405. def get_batch_key(self, name_list):
  406. conn = self.connect()
  407. res = conn.mget(name_list)
  408. return res
  409. def mget(self, keys):
  410. st_time = time.time()
  411. conn = self.connect()
  412. data = conn.mget(keys=keys)
  413. #print(f"mget time: {(time.time() - st_time) * 1000}")
  414. return data
  415. #hologres_info = config_.HOLOGRES_INFO
  416. #conn = psycopg2.connect(**hologres_info)
  417. #cur = conn.cursor()
  418. # holo连接
  419. # connectPool = pgpool.SimpleConnectionPool(1, 30, **config_.HOLOGRES_INFO)
  420. # class HologresHelper(object):
  421. # def __init__(self):
  422. # """初始化hologres连接信息"""
  423. # #self.hologres_info = config_.HOLOGRES_INFO
  424. #
  425. # def get_data(self, sql):
  426. # #global conn
  427. # # 连接Hologres
  428. # #conn = psycopg2.connect(**self.hologres_info)
  429. # conn = connectPool.getconn()
  430. # # 创建游标
  431. # cur = conn.cursor()
  432. # try:
  433. # # 查询数据
  434. # cur.execute(sql)
  435. # data = cur.fetchall()
  436. # cur.close()
  437. # connectPool.putconn(conn, close=False)
  438. # # 提交事务
  439. # #conn.commit()
  440. # # 释放资源
  441. # #cur.close()
  442. # #conn.close()
  443. # except Exception as e:
  444. # log_.error('hologress error...')
  445. # log_.error(traceback.format_exc())
  446. # cur.close()
  447. # connectPool.putconn(conn, close=False)
  448. # return []
  449. # return data
  450. class MysqlHelper(object):
  451. def __init__(self):
  452. """
  453. 初始化mysql连接信息
  454. """
  455. self.mysql_info = config_.MYSQL_INFO
  456. def get_data(self, sql):
  457. """
  458. 查询数据
  459. :param sql: sql语句
  460. :return: data
  461. """
  462. # 连接数据库
  463. conn = pymysql.connect(**self.mysql_info)
  464. # 创建游标
  465. cursor = conn.cursor()
  466. try:
  467. # 执行SQL语句
  468. cursor.execute(sql)
  469. # 获取查询的所有记录
  470. data = cursor.fetchall()
  471. except Exception as e:
  472. return None
  473. # 关闭游标对象
  474. cursor.close()
  475. # 关闭数据库连接
  476. conn.close()
  477. return data
  478. if __name__ == '__main__':
  479. redis_helper = RedisHelper()
  480. # key = 'flow.video.12345.123#112'
  481. # redis_helper.decr_key(key_name=key)
  482. # res = redis_helper.get_data_from_redis(key_name=key)
  483. # print(int(res), type(int(res)))
  484. # data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  485. # print(len(data))
  486. # key_name = 'com.weiqu.video.hot.recommend.previewed.4.weixin_openid_otjoB5VG780SB4aVjYqBBNLb - X6M'
  487. # values = (6134455, 9772930, 9912678, 9901969, 9926876, 9904203, 2384831, 9932272, 9737653, 9925240)
  488. # key_name = 'com.weiqu.video.hot.recommend.previewed.4.abcd1'
  489. # values = (9902612, 9905573, 9928264, 9932148, 9809440, 9919900, 6093379, 9917093, 9793537, 9814345)
  490. # redis_helper.add_data_with_set(key_name=key_name, values=values, expire_time=30 * 60)
  491. # res = redis_helper.get_data_from_set(key_name=key_name)
  492. res = redis_helper.remove_value_from_zset(
  493. key_name="recall:item:score:region:dup3:24h:110000:data1:rule4:20230315:14",
  494. value=111111)
  495. print(res)