# -*- coding: utf-8 -*- # @ModuleName: app_rank_h # @Author: Liqian # @Time: 2022/3/18 下午2:03 # @Software: PyCharm import datetime from datetime import datetime as dt from threading import Timer from log import Log from db_helper import RedisHelper from my_config import set_config from odps import ODPS log_ = Log() config_, env = set_config() def op_data_check(project, table, hour): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: sql = f'select * from {project}.{table} where hour = {hour}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count except Exception as e: data_count = 0 return data_count def get_op_data(now_date, project, table): """ 获取运营提供的数据 """ hour = dt.strftime(now_date, '%Y%m%d%H') odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) records = odps.read_table(name=table, partition='hour=%s' % hour) op_data = [] for record in records: item = {'videoid': record['videoid'], 'rank': record['rank']} # print(item) op_data.append(item) # print(op_data) return op_data def app_rank_op(now_date, now_h): """ 票圈视频App推荐列表小时级更新,小时级数据由运营提供 """ log_.info("now date: {}".format(now_date)) # 获取rov模型结果 redis_helper = RedisHelper() key_name = get_redis_key_date(now_date=now_date) initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True) log_.info(f'initial data count = {len(initial_data)}') # 获取当前小时op更新的数据 # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}" # op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True) op_data = get_op_data(now_date=now_date, project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE) # 倒序排序 op_data.sort(key=lambda x: int(x['rank']), reverse=True) op_video_ids = [int(item['videoid']) for item in op_data] log_.info(f'op video count = {len(op_video_ids)}') # 对op更新数据给定score final_data = dict() for i, video_id in enumerate(op_video_ids): score = 1000 + i + 1 final_data[video_id] = score # 合并结果 for video_id, score in initial_data: if int(video_id) not in op_video_ids: final_data[int(video_id)] = score # print(video_id, score) # print(op_data) # print(final_data) log_.info(f'final data count = {len(final_data)}') # 存入对应的redis final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}" redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600) def get_redis_key_date(now_date): # 获取rov模型结果存放key redis_helper = RedisHelper() now_dt = dt.strftime(now_date, '%Y%m%d') key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{now_dt}' if not redis_helper.key_exists(key_name=key_name): pre_dt = dt.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d') key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{pre_dt}' return key_name def app_rank_bottom(now_date, now_h): """运营未按时更新数据,用rov模型结果作为当前小时的数据""" # 获取rov模型结果 redis_helper = RedisHelper() key_name = get_redis_key_date(now_date=now_date) initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True) final_data = dict() for video_id, score in initial_data: final_data[video_id] = score # 存入对应的redis final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}" redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600) def app_timer_check(): now_date = dt.today() log_.info(f"now_date: {dt.strftime(now_date, '%Y%m%d')}") now_h = dt.now().hour now_min = dt.now().minute # 查看当前小时op更新的数据是否已准备好 # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}" op_data_count = op_data_check(project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE, hour=dt.strftime(now_date, '%Y%m%d%H')) if op_data_count > 0: # 数据准备好,进行更新 app_rank_op(now_date=now_date, now_h=now_h) elif now_min > 50: log_.info('op data is None, use bottom data!') app_rank_bottom(now_date=now_date, now_h=now_h) else: # 数据没准备好,1分钟后重新检查 Timer(60, app_timer_check).start() if __name__ == '__main__': # now_date = dt.today() # print(dt.strftime(now_date, '%Y%m%d%H')) # get_op_data(now_date=now_date) # now_h = dt.now().hour # app_rank_op(now_date=now_date, now_h=now_h) # key_name = get_redis_key_date(now_date=now_date) # print(key_name) app_timer_check()