1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import json
- import redis
- from common.odps_data import OdpsDataCount
- from common.sql_help import sqlCollect
- class SyncRedisHelper:
- _pool: redis.ConnectionPool = None
- _instance = None
- def __init__(self):
- if not self._instance:
- self._pool = self._get_pool()
- self._instance = self
- def _get_pool(self) -> redis.ConnectionPool:
- if self._pool is None:
- self._pool = redis.ConnectionPool(
- # host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com", # 外网地址
- host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址
- port=6379,
- db=0,
- password="Wqsd@2019",
- # password="Qingqu2019",
- )
- return self._pool
- def get_client(self) -> redis.Redis:
- pool = self._get_pool()
- client = redis.Redis(connection_pool=pool)
- return client
- def close(self):
- if self._pool:
- self._pool.disconnect(inuse_connections=True)
- def install_video_data(dt, redis_task, table_name):
- """写入redis需要打标签的视频"""
- data = OdpsDataCount.main(table_name, dt)
- if not data:
- return
- # task = f"task:video_ai"
- helper = SyncRedisHelper()
- client = helper.get_client()
- client.rpush(redis_task, *data)
- def install_ad_video_data(redis_task):
- """广告写入redis需要打标签的视频"""
- data = sqlCollect.select_ad_list()
- if not data:
- return
- data = list(data)
- # 字段名列表,用于创建字典
- field_names = ["ad_id", "creative_code", "creative_title", "material_address", "click_button_text", "creative_logo_address", "update_time"]
- # 将每个元组转换为字典,并存入新的列表 data_dicts 中
- data_dicts = [{key: str(value) for key, value in zip(field_names, item)} for item in data]
- print(len(data_dicts))
- helper = SyncRedisHelper()
- client = helper.get_client()
- for item in data_dicts:
- json_data = json.dumps(item)
- client.rpush(redis_task, json_data)
- # client.rpush(redis_task, *data_dicts)
- def get_video_data(redis_task):
- """获取一条需要打标签的视频"""
- helper = SyncRedisHelper()
- client = helper.get_client()
- ret = client.rpop(redis_task)
- return ret
- def ad_in_video_data(ret):
- """分析失败视频重新写入redis"""
- task = f"task:ad_video_recommend"
- helper = SyncRedisHelper()
- client = helper.get_client()
- client.rpush(task, ret)
|