db_helper.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. import traceback
  2. import time
  3. import redis
  4. # import psycopg2
  5. import pymysql
  6. # from psycopg2 import pool as pgpool
  7. from config import set_config
  8. from log import Log
  9. config_ = set_config()
  10. log_ = Log()
  11. conn_redis = None
  12. conn_filter_redis = None
  13. class RedisHelper(object):
  14. def __init__(self, params=None, redis_info=config_.REDIS_INFO):
  15. """
  16. 初始化redis连接信息
  17. redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
  18. """
  19. self.redis_info = redis_info
  20. self.host = redis_info['host']
  21. self.port = redis_info['port']
  22. self.password = redis_info['password']
  23. self.params = params
  24. def connect(self):
  25. """
  26. 连接redis
  27. :return: conn
  28. """
  29. if self.redis_info == config_.REDIS_INFO_FILTER:
  30. global conn_filter_redis
  31. if conn_filter_redis is None:
  32. pool = redis.ConnectionPool(host=self.host,
  33. port=self.port,
  34. password=self.password,
  35. decode_responses=True)
  36. conn = redis.Redis(connection_pool=pool)
  37. conn_filter_redis = conn
  38. return conn_filter_redis
  39. else:
  40. global conn_redis
  41. if conn_redis is None:
  42. pool = redis.ConnectionPool(host=self.host,
  43. port=self.port,
  44. password=self.password,
  45. decode_responses=True)
  46. conn = redis.Redis(connection_pool=pool)
  47. conn_redis = conn
  48. return conn_redis
  49. def key_exists(self, key_name):
  50. """
  51. 判断key是否存在
  52. :param key_name: key
  53. :return: 存在-True, 不存在-False
  54. """
  55. # start_time = time.time()
  56. conn = self.connect()
  57. res = conn.exists(key_name)
  58. # if self.params is not None:
  59. # log_.info({
  60. # 'logTimestamp': int(time.time() * 1000),
  61. # 'request_id': self.params.request_id,
  62. # 'operation': 'get_data_from_redis',
  63. # 'executeTime': (time.time() - start_time) * 1000
  64. # })
  65. return res
  66. def del_keys(self, key_name):
  67. """
  68. 删除key
  69. :param key_name: key
  70. :return: None
  71. """
  72. conn = self.connect()
  73. conn.delete(key_name)
  74. def get_data_from_redis(self, key_name):
  75. """
  76. 读取redis中的数据
  77. :param key_name: key
  78. :return: data
  79. """
  80. # start_time = time.time()
  81. conn = self.connect()
  82. if not conn.exists(key_name):
  83. # key不存在
  84. return None
  85. data = conn.get(key_name)
  86. # if self.params is not None:
  87. # log_.info({
  88. # 'logTimestamp': int(time.time() * 1000),
  89. # 'request_id': self.params.request_id,
  90. # 'operation': 'get_data_from_redis',
  91. # 'executeTime': (time.time() - start_time) * 1000
  92. # })
  93. return data
  94. def set_data_to_redis(self, key_name, value, expire_time=24*3600):
  95. """
  96. 新增数据
  97. :param key_name: key
  98. :param value: 元素的值 videoId
  99. :param expire_time: 过期时间,单位:s,默认1天
  100. :return: None
  101. """
  102. # start_time = time.time()
  103. conn = self.connect()
  104. conn.set(key_name, value, ex=int(expire_time))
  105. # if self.params is not None:
  106. # log_.info({
  107. # 'logTimestamp': int(time.time() * 1000),
  108. # 'request_id': self.params.request_id,
  109. # 'operation': 'set_data_to_redis',
  110. # 'executeTime': (time.time() - start_time) * 1000})
  111. def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
  112. """
  113. 新增数据,有序set
  114. :param key_name: key
  115. :param data: 元素的值及对应分数 type-dict {value: score}
  116. :param expire_time: 过期时间,单位:s,默认7天,type-int
  117. :return: None
  118. """
  119. # start_time = time.time()
  120. conn = self.connect()
  121. conn.zadd(key_name, data)
  122. # 设置过期时间
  123. conn.expire(key_name, int(expire_time))
  124. # if self.params is not None:
  125. # log_.info({
  126. # 'logTimestamp': int(time.time() * 1000),
  127. # 'request_id': self.params.request_id,
  128. # 'operation': 'add_data_with_zset',
  129. # 'executeTime': (time.time() - start_time) * 1000
  130. # })
  131. def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
  132. """
  133. 根据索引位置获取元素的值
  134. :param key_name: key
  135. :param start: 索引起始点 闭区间,包含start
  136. :param end: 索引结束点 闭区间,包含end
  137. :param desc: 分数排序方式,默认从大到小
  138. :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
  139. :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
  140. """
  141. # start_time = time.time()
  142. conn = self.connect()
  143. if not conn.exists(key_name):
  144. return None
  145. data = conn.zrange(key_name, start, end, desc, with_scores)
  146. if with_scores:
  147. data = data
  148. else:
  149. data = [eval(value) for value in data]
  150. # if self.params is not None:
  151. # log_.info({
  152. # 'logTimestamp': int(time.time() * 1000),
  153. # 'request_id': self.params.request_id,
  154. # 'operation': 'get_data_zset_with_index',
  155. # 'executeTime': (time.time() - start_time) * 1000
  156. # })
  157. return data
  158. def get_all_data_from_zset(self, key_name, desc=True, with_scores=False):
  159. """
  160. 获取zset中所有元素的值
  161. :param key_name: key
  162. :param desc: 分数排序方式,默认从大到小
  163. :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
  164. :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
  165. """
  166. conn = self.connect()
  167. if not conn.exists(key_name):
  168. return None
  169. data = []
  170. start = 0
  171. step = 100
  172. while True:
  173. end = start + step - 1
  174. temp = conn.zrange(key_name, start, end, desc, with_scores)
  175. if not temp:
  176. break
  177. data.extend(temp)
  178. start += step
  179. return data
  180. def get_score_with_value(self, key_name, value):
  181. """
  182. 在zset中,根据元素的value获取对应的score
  183. :param key_name: key
  184. :param value: 元素的值
  185. :return: score value对应的score
  186. """
  187. conn = self.connect()
  188. if not conn.exists(key_name):
  189. return None
  190. return conn.zscore(key_name, value)
  191. def get_rank_with_value(self, key_name, value, desc=False):
  192. """
  193. 在zset中,根据元素的value获取对应排名
  194. :param key_name: key
  195. :param value: 元素的值
  196. :param desc: 是否倒序 type-bool 默认:False-按照score从小到大
  197. :return: rank value对应的rank,从0开始,不存在返回None
  198. """
  199. conn = self.connect()
  200. if not conn.exists(key_name):
  201. return None
  202. if desc is True:
  203. return conn.zrevrank(key_name, value)
  204. else:
  205. return conn.zrank(key_name, value)
  206. def update_score_with_value(self, key_name, value, score, expire_time=24*3600):
  207. """
  208. 在zset中,修改元素value对应的score
  209. :param key_name: key
  210. :param value: 元素的值
  211. :param score: value对应的score更新值
  212. :param expire_time: 过期时间,单位:s,默认1天,type-int
  213. """
  214. conn = self.connect()
  215. if conn.exists(key_name):
  216. conn.zadd(key_name, {value: score})
  217. else:
  218. # key不存在时,需设置过期时间
  219. conn.zadd(key_name, {value: score})
  220. conn.expire(key_name, int(expire_time))
  221. def remove_value_from_zset(self, key_name, value):
  222. """
  223. 删除zset中的指定元素
  224. :param key_name: key
  225. :param value: 元素的值
  226. :return: None
  227. """
  228. # start_time = time.time()
  229. conn = self.connect()
  230. res = conn.zrem(key_name, value)
  231. # if self.params is not None:
  232. # log_.info({
  233. # 'logTimestamp': int(time.time() * 1000),
  234. # 'request_id': self.params.request_id,
  235. # 'operation': 'remove_value_from_zset',
  236. # 'executeTime': (time.time() - start_time) * 1000
  237. # })
  238. return res
  239. def get_index_with_data(self, key_name, value):
  240. """
  241. 根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
  242. :param key_name: key
  243. :param value: 元素的值
  244. :return: idx 位置索引
  245. """
  246. # start_time = time.time()
  247. conn = self.connect()
  248. res = conn.zrevrank(key_name, value)
  249. # if self.params is not None:
  250. # log_.info({
  251. # 'logTimestamp': int(time.time() * 1000),
  252. # 'request_id': self.params.request_id,
  253. # 'operation': 'get_index_with_data',
  254. # 'executeTime': (time.time() - start_time) * 1000
  255. # })
  256. return res
  257. def get_data_from_set(self, key_name):
  258. """
  259. 获取set中的所有数据
  260. :param key_name: key
  261. :return: data
  262. """
  263. # start_time = time.time()
  264. conn = self.connect()
  265. if not conn.exists(key_name):
  266. # key不存在
  267. return None
  268. data = []
  269. cursor = 0
  270. while True:
  271. cur, temp = conn.sscan(key_name, cursor=cursor, count=2000)
  272. data.extend(temp)
  273. if cur == 0:
  274. break
  275. cursor = cur
  276. # if self.params is not None:
  277. # log_.info({
  278. # 'logTimestamp': int(time.time() * 1000),
  279. # 'request_id': self.params.request_id,
  280. # 'operation': 'get_data_from_set',
  281. # 'executeTime': (time.time() - start_time) * 1000
  282. # })
  283. return list(set(data))
  284. def add_data_with_set(self, key_name, values, expire_time=30*60):
  285. """
  286. 新增数据,set
  287. :param key_name: key
  288. :param values: 要添加的元素 类型-tuple
  289. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  290. :return: None
  291. """
  292. # start_time = time.time()
  293. conn = self.connect()
  294. conn.sadd(key_name, *values)
  295. # 设置过期时间
  296. conn.expire(key_name, int(expire_time))
  297. # if self.params is not None:
  298. # log_.info({
  299. # 'logTimestamp': int(time.time() * 1000),
  300. # 'request_id': self.params.request_id,
  301. # 'operation': 'add_data_with_set',
  302. # 'executeTime': (time.time() - start_time) * 1000
  303. # })
  304. def data_exists_with_set(self, key_name, value):
  305. """
  306. 判断元素value是否在集合key_name中
  307. :param key_name: key
  308. :param value: 需判断的元素
  309. :return: 存在-True, 不存在-False
  310. """
  311. # start_time = time.time()
  312. conn = self.connect()
  313. res = conn.sismember(key_name, value)
  314. # if self.params is not None:
  315. # log_.info({
  316. # 'logTimestamp': int(time.time() * 1000),
  317. # 'request_id': self.params.request_id,
  318. # 'operation': 'data_exists_with_set',
  319. # 'executeTime': (time.time() - start_time) * 1000
  320. # })
  321. return res
  322. def get_data_with_count_from_set(self, key_name, count=1):
  323. """
  324. 从set中随机获取元素,并放回
  325. :param key_name: key
  326. :param count: 获取个数, 默认为1
  327. :return:
  328. """
  329. conn = self.connect()
  330. data = conn.srandmember(name=key_name, number=count)
  331. return data
  332. def remove_value_from_set(self, key_name, values):
  333. """
  334. 删除set中的指定元素
  335. :param key_name: key
  336. :param values: 元素的值, 类型-tuple
  337. :return: None
  338. """
  339. # start_time = time.time()
  340. conn = self.connect()
  341. res = conn.srem(key_name, *values)
  342. # if self.params is not None:
  343. # log_.info({
  344. # 'logTimestamp': int(time.time() * 1000),
  345. # 'request_id': self.params.request_id,
  346. # 'operation': 'remove_value_from_set',
  347. # 'executeTime': (time.time() - start_time) * 1000
  348. # })
  349. return res
  350. def decr_key(self, key_name, amount=1, expire_time=30*60):
  351. """
  352. redis自减
  353. :param key_name: key
  354. :param amount: 自减数,默认为1,type-int
  355. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  356. :return: None
  357. """
  358. # start_time = time.time()
  359. conn = self.connect()
  360. conn.decr(name=key_name, amount=amount)
  361. conn.expire(key_name, int(expire_time))
  362. # if self.params is not None:
  363. # log_.info({
  364. # 'logTimestamp': int(time.time() * 1000),
  365. # 'request_id': self.params.request_id,
  366. # 'operation': 'decr_key',
  367. # 'executeTime': (time.time() - start_time) * 1000
  368. # })
  369. def incr_key(self, key_name, amount=1, expire_time=30*60):
  370. """
  371. redis自增
  372. :param key_name: key
  373. :param amount: 自增数,默认为1,type-int
  374. :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
  375. :return: None
  376. """
  377. # start_time = time.time()
  378. conn = self.connect()
  379. conn.incr(name=key_name, amount=amount)
  380. conn.expire(key_name, int(expire_time))
  381. # if self.params is not None:
  382. # log_.info({
  383. # 'logTimestamp': int(time.time() * 1000),
  384. # 'request_id': self.params.request_id,
  385. # 'operation': 'incr_key',
  386. # 'executeTime': (time.time() - start_time) * 1000
  387. # })
  388. def setnx_key(self, key_name, value, expire_time=5*60):
  389. """
  390. 当key不存在时,将value塞入key中,key存在时不做操作
  391. :param key_name: key
  392. :param value: value
  393. :return: 过期时间,单位:s,默认5分钟 type-int
  394. """
  395. # start_time = time.time()
  396. conn = self.connect()
  397. conn.setnx(name=key_name, value=value)
  398. conn.expire(name=key_name, time=int(expire_time))
  399. # if self.params is not None:
  400. # log_.info({
  401. # 'logTimestamp': int(time.time() * 1000),
  402. # 'request_id': self.params.request_id,
  403. # 'operation': 'setnx_key',
  404. # 'executeTime': (time.time() - start_time) * 1000
  405. # })
  406. def get_batch_key(self, name_list):
  407. conn = self.connect()
  408. res = conn.mget(name_list)
  409. return res
  410. def mget(self, keys):
  411. st_time = time.time()
  412. conn = self.connect()
  413. data = conn.mget(keys=keys)
  414. #print(f"mget time: {(time.time() - st_time) * 1000}")
  415. return data
  416. #hologres_info = config_.HOLOGRES_INFO
  417. #conn = psycopg2.connect(**hologres_info)
  418. #cur = conn.cursor()
  419. # holo连接
  420. # connectPool = pgpool.SimpleConnectionPool(1, 30, **config_.HOLOGRES_INFO)
  421. # class HologresHelper(object):
  422. # def __init__(self):
  423. # """初始化hologres连接信息"""
  424. # #self.hologres_info = config_.HOLOGRES_INFO
  425. #
  426. # def get_data(self, sql):
  427. # #global conn
  428. # # 连接Hologres
  429. # #conn = psycopg2.connect(**self.hologres_info)
  430. # conn = connectPool.getconn()
  431. # # 创建游标
  432. # cur = conn.cursor()
  433. # try:
  434. # # 查询数据
  435. # cur.execute(sql)
  436. # data = cur.fetchall()
  437. # cur.close()
  438. # connectPool.putconn(conn, close=False)
  439. # # 提交事务
  440. # #conn.commit()
  441. # # 释放资源
  442. # #cur.close()
  443. # #conn.close()
  444. # except Exception as e:
  445. # log_.error('hologress error...')
  446. # log_.error(traceback.format_exc())
  447. # cur.close()
  448. # connectPool.putconn(conn, close=False)
  449. # return []
  450. # return data
  451. class MysqlHelper(object):
  452. def __init__(self):
  453. """
  454. 初始化mysql连接信息
  455. """
  456. self.mysql_info = config_.MYSQL_INFO
  457. def get_data(self, sql):
  458. """
  459. 查询数据
  460. :param sql: sql语句
  461. :return: data
  462. """
  463. # 连接数据库
  464. conn = pymysql.connect(**self.mysql_info)
  465. # 创建游标
  466. cursor = conn.cursor()
  467. try:
  468. # 执行SQL语句
  469. cursor.execute(sql)
  470. # 获取查询的所有记录
  471. data = cursor.fetchall()
  472. except Exception as e:
  473. return None
  474. # 关闭游标对象
  475. cursor.close()
  476. # 关闭数据库连接
  477. conn.close()
  478. return data
  479. if __name__ == '__main__':
  480. redis_helper = RedisHelper()
  481. # key = 'flow.video.12345.123#112'
  482. # redis_helper.decr_key(key_name=key)
  483. # res = redis_helper.get_data_from_redis(key_name=key)
  484. # print(int(res), type(int(res)))
  485. # data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  486. # print(len(data))
  487. # key_name = 'com.weiqu.video.hot.recommend.previewed.4.weixin_openid_otjoB5VG780SB4aVjYqBBNLb - X6M'
  488. # values = (6134455, 9772930, 9912678, 9901969, 9926876, 9904203, 2384831, 9932272, 9737653, 9925240)
  489. # key_name = 'com.weiqu.video.hot.recommend.previewed.4.abcd1'
  490. # values = (9902612, 9905573, 9928264, 9932148, 9809440, 9919900, 6093379, 9917093, 9793537, 9814345)
  491. # redis_helper.add_data_with_set(key_name=key_name, values=values, expire_time=30 * 60)
  492. # res = redis_helper.get_data_from_set(key_name=key_name)
  493. key_name = f"previewed:videos:5:aan7"
  494. res = redis_helper.get_data_with_count_from_set(
  495. key_name=key_name,
  496. count=20)
  497. print(res)
  498. res1 = redis_helper.remove_value_from_set(key_name=key_name, values=tuple({'2881413'}))
  499. print(res1)
  500. res = redis_helper.get_data_with_count_from_set(
  501. key_name=key_name,
  502. count=20)
  503. print(res)