import json import redis from common.odps_data import OdpsDataCount from common.sql_help import sqlCollect class SyncRedisHelper: _pool: redis.ConnectionPool = None _instance = None def __init__(self): if not self._instance: self._pool = self._get_pool() self._instance = self def _get_pool(self) -> redis.ConnectionPool: if self._pool is None: self._pool = redis.ConnectionPool( # host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com", # 外网地址 host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址 port=6379, db=0, password="Wqsd@2019", # password="Qingqu2019", ) return self._pool def get_client(self) -> redis.Redis: pool = self._get_pool() client = redis.Redis(connection_pool=pool) return client def close(self): if self._pool: self._pool.disconnect(inuse_connections=True) def install_video_data(dt, redis_task, table_name): """写入redis需要打标签的视频""" data = OdpsDataCount.main(table_name, dt) if not data: return # task = f"task:video_ai" helper = SyncRedisHelper() client = helper.get_client() client.rpush(redis_task, *data) def install_ad_video_data(redis_task): """广告写入redis需要打标签的视频""" data = sqlCollect.select_ad_list() if not data: return data = list(data) # 字段名列表,用于创建字典 field_names = ["ad_id", "creative_code", "creative_title", "material_address", "click_button_text", "creative_logo_address", "update_time"] # 将每个元组转换为字典,并存入新的列表 data_dicts 中 data_dicts = [{key: str(value) for key, value in zip(field_names, item)} for item in data] print(len(data_dicts)) helper = SyncRedisHelper() client = helper.get_client() for item in data_dicts: json_data = json.dumps(item) client.rpush(redis_task, json_data) # client.rpush(redis_task, *data_dicts) def get_video_data(redis_task): """获取一条需要打标签的视频""" helper = SyncRedisHelper() client = helper.get_client() ret = client.rpop(redis_task) return ret def ad_in_video_data(ret): """分析失败视频重新写入redis""" task = f"task:ad_video_recommend" helper = SyncRedisHelper() client = helper.get_client() client.rpush(task, ret)