123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding: utf-8 -*-
- # @ModuleName: app_rank_h
- # @Author: Liqian
- # @Time: 2022/3/18 下午2:03
- # @Software: PyCharm
- import datetime
- from datetime import datetime as dt
- from threading import Timer
- from log import Log
- from db_helper import RedisHelper
- from config import set_config
- from odps import ODPS
- log_ = Log()
- config_, env = set_config()
- def op_data_check(project, table, hour):
- """检查数据是否准备好"""
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- sql = f'select * from {project}.{table} where hour = {hour}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- except Exception as e:
- data_count = 0
- return data_count
- def get_op_data(now_date, project, table):
- """
- 获取运营提供的数据
- """
- hour = dt.strftime(now_date, '%Y%m%d%H')
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- records = odps.read_table(name=table, partition='hour=%s' % hour)
- op_data = []
- for record in records:
- item = {'videoid': record['videoid'], 'rank': record['rank']}
- # print(item)
- op_data.append(item)
- # print(op_data)
- return op_data
- def app_rank_op(now_date, now_h):
- """
- 票圈视频App推荐列表小时级更新,小时级数据由运营提供
- """
- log_.info("now date: {}".format(now_date))
- # 获取rov模型结果
- redis_helper = RedisHelper()
- key_name = get_redis_key_date(now_date=now_date)
- initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
- log_.info(f'initial data count = {len(initial_data)}')
- # 获取当前小时op更新的数据
- # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
- # op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True)
- op_data = get_op_data(now_date=now_date, project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE)
- # 倒序排序
- op_data.sort(key=lambda x: int(x['rank']), reverse=True)
- op_video_ids = [int(item['videoid']) for item in op_data]
- log_.info(f'op video count = {len(op_video_ids)}')
- # 对op更新数据给定score
- final_data = dict()
- for i, video_id in enumerate(op_video_ids):
- score = 1000 + i + 1
- final_data[video_id] = score
- # 合并结果
- for video_id, score in initial_data:
- if int(video_id) not in op_video_ids:
- final_data[int(video_id)] = score
- # print(video_id, score)
- # print(op_data)
- # print(final_data)
- log_.info(f'final data count = {len(final_data)}')
- # 存入对应的redis
- final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
- def get_redis_key_date(now_date):
- # 获取rov模型结果存放key
- redis_helper = RedisHelper()
- now_dt = dt.strftime(now_date, '%Y%m%d')
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{now_dt}'
- if not redis_helper.key_exists(key_name=key_name):
- pre_dt = dt.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{pre_dt}'
- return key_name
- def app_rank_bottom(now_date, now_h):
- """运营未按时更新数据,用rov模型结果作为当前小时的数据"""
- # 获取rov模型结果
- redis_helper = RedisHelper()
- key_name = get_redis_key_date(now_date=now_date)
- initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
- final_data = dict()
- for video_id, score in initial_data:
- final_data[video_id] = score
- # 存入对应的redis
- final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
- def app_timer_check():
- now_date = dt.today()
- log_.info(f"now_date: {dt.strftime(now_date, '%Y%m%d')}")
- now_h = dt.now().hour
- now_min = dt.now().minute
- # 查看当前小时op更新的数据是否已准备好
- # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
- op_data_count = op_data_check(project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE,
- hour=dt.strftime(now_date, '%Y%m%d%H'))
- if op_data_count > 0:
- # 数据准备好,进行更新
- app_rank_op(now_date=now_date, now_h=now_h)
- elif now_min > 50:
- log_.info('op data is None, use bottom data!')
- app_rank_bottom(now_date=now_date, now_h=now_h)
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, app_timer_check).start()
- if __name__ == '__main__':
- # now_date = dt.today()
- # print(dt.strftime(now_date, '%Y%m%d%H'))
- # get_op_data(now_date=now_date)
- # now_h = dt.now().hour
- # app_rank_op(now_date=now_date, now_h=now_h)
- # key_name = get_redis_key_date(now_date=now_date)
- # print(key_name)
- app_timer_check()
|