redis.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import json
  2. import redis
  3. from common.odps_data import OdpsDataCount
  4. from common.sql_help import sqlCollect
  5. class SyncRedisHelper:
  6. _pool: redis.ConnectionPool = None
  7. _instance = None
  8. def __init__(self):
  9. if not self._instance:
  10. self._pool = self._get_pool()
  11. self._instance = self
  12. def _get_pool(self) -> redis.ConnectionPool:
  13. if self._pool is None:
  14. self._pool = redis.ConnectionPool(
  15. # host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com", # 外网地址
  16. host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址
  17. port=6379,
  18. db=0,
  19. password="Wqsd@2019",
  20. # password="Qingqu2019",
  21. )
  22. return self._pool
  23. def get_client(self) -> redis.Redis:
  24. pool = self._get_pool()
  25. client = redis.Redis(connection_pool=pool)
  26. return client
  27. def close(self):
  28. if self._pool:
  29. self._pool.disconnect(inuse_connections=True)
  30. def install_video_data(dt, redis_task, table_name):
  31. """写入redis需要打标签的视频"""
  32. data = OdpsDataCount.main(table_name, dt)
  33. if not data:
  34. return
  35. # task = f"task:video_ai"
  36. helper = SyncRedisHelper()
  37. client = helper.get_client()
  38. client.rpush(redis_task, *data)
  39. def install_ad_video_data(redis_task):
  40. """广告写入redis需要打标签的视频"""
  41. data = sqlCollect.select_ad_list()
  42. if not data:
  43. return
  44. data = list(data)
  45. # 字段名列表,用于创建字典
  46. field_names = ["ad_id", "creative_code", "creative_title", "material_address", "click_button_text", "creative_logo_address", "update_time"]
  47. # 将每个元组转换为字典,并存入新的列表 data_dicts 中
  48. data_dicts = [{key: str(value) for key, value in zip(field_names, item)} for item in data]
  49. print(len(data_dicts))
  50. helper = SyncRedisHelper()
  51. client = helper.get_client()
  52. for item in data_dicts:
  53. json_data = json.dumps(item)
  54. client.rpush(redis_task, json_data)
  55. # client.rpush(redis_task, *data_dicts)
  56. def get_video_data(redis_task):
  57. """获取一条需要打标签的视频"""
  58. helper = SyncRedisHelper()
  59. client = helper.get_client()
  60. ret = client.rpop(redis_task)
  61. return ret
  62. def ad_in_video_data(ret):
  63. """分析失败视频重新写入redis"""
  64. task = f"task:ad_video_recommend"
  65. helper = SyncRedisHelper()
  66. client = helper.get_client()
  67. client.rpush(task, ret)