123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import pandas as pd
- from odps import ODPS
- from datetime import datetime, timedelta
- from threading import Timer
- from utils import get_data_from_odps
- from db_helper import RedisHelper
- from config import set_config
- from log import Log
- config_, _ = set_config()
- log_ = Log()
- features = [
- 'videoid',
- 'preview人数', # 过去1天预曝光人数
- 'view人数', # 过去1天曝光人数
- 'play人数', # 过去1天播放人数
- 'share人数', # 过去1天分享人数
- '回流人数', # 过去1天分享,过去1天回流人数
- 'preview次数', # 过去1天预曝光次数
- 'view次数', # 过去1天曝光次数
- 'play次数', # 过去1天播放次数
- 'share次数', # 过去1天分享次数
- ]
- def get_rov_redis_key(now_date):
- # 获取rov模型结果存放key
- redis_helper = RedisHelper()
- now_dt = datetime.strftime(now_date, '%Y%m%d')
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
- key_dt = now_dt
- if not redis_helper.key_exists(key_name=key_name):
- pre_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
- key_dt = pre_dt
- return key_name, key_dt
- def day_data_check(project, table, now_date):
- """检查数据是否准备好"""
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
- sql = f'select * from {project}.{table} where dt = {dt}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- except Exception as e:
- data_count = 0
- return data_count
- def get_feature_data(now_date, project, table):
- """获取特征数据"""
- dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
- # dt = '20220425'
- records = get_data_from_odps(date=dt, project=project, table=table)
- feature_data = []
- for record in records:
- item = {}
- for feature_name in features:
- item[feature_name] = record[feature_name]
- feature_data.append(item)
- feature_df = pd.DataFrame(feature_data)
- return feature_df
- def cal_score1(df):
- # score1计算公式: score = 回流人数/(view人数+10000)
- df = df.fillna(0)
- df['score'] = df['回流人数'] / (df['view人数'] + 1000)
- df = df.sort_values(by=['score'], ascending=False)
- return df
- def cal_score2(df):
- # score2计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
- df = df.fillna(0)
- df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
- df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
- df['score'] = df['share_rate'] + 0.01 * df['back_rate']
- df = df.sort_values(by=['score'], ascending=False)
- return df
- def video_rank_day(df, now_date, rule_key, param):
- """
- 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
- :param df:
- :param now_date:
- :param rule_key: 天级规则数据进入条件
- :param param: 天级规则数据进入条件参数
- :return:
- """
- # 获取rov模型结果
- redis_helper = RedisHelper()
- key_name, key_dt = get_rov_redis_key(now_date=now_date)
- initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
- log_.info(f'initial data count = {len(initial_data)}, key_dt = {key_dt}')
- # 获取符合进入召回源条件的视频
- return_count = param.get('return_count')
- if return_count:
- day_recall_df = df[df['回流人数'] > return_count]
- else:
- day_recall_df = df
- day_recall_videos = day_recall_df['videoid'].to_list()
- log_.info(f'day_recall videos count = {len(day_recall_videos)}')
- # 写入对应的redis
- day_video_ids =[]
- day_recall_result = {}
- for video_id in day_recall_videos:
- score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
- day_recall_result[int(video_id)] = float(score)
- day_video_ids.append(int(video_id))
- day_recall_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{rule_key}.{datetime.strftime(now_date, '%Y%m%d')}"
- if len(day_recall_result) > 0:
- redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=7 * 24 * 3600)
- # 去重更新rov模型结果,并另存为redis中
- initial_data_dup = {}
- for video_id, score in initial_data:
- if int(video_id) not in day_video_ids:
- initial_data_dup[int(video_id)] = score
- log_.info(f"initial data dup count = {len(initial_data_dup)}")
- now_dt = datetime.strftime(now_date, '%Y%m%d')
- if key_dt == now_dt:
- initial_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
- else:
- initial_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
- initial_key_name = f"{initial_key_name_prefix}{rule_key}.{now_dt}"
- if len(initial_data_dup) > 0:
- redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=7 * 24 * 3600)
- def rank_by_day(now_date, rule_params, project, table):
- # 获取特征数据
- feature_df = get_feature_data(now_date=now_date, project=project, table=table)
- # rank
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 计算score
- cal_score_func = value.get('cal_score_func', 1)
- if cal_score_func == 2:
- score_df = cal_score2(df=feature_df)
- else:
- score_df = cal_score1(df=feature_df)
- video_rank_day(df=score_df, now_date=now_date, rule_key=key, param=value)
- # to-csv
- score_filename = f"score_{key}_{datetime.strftime(now_date, '%Y%m%d')}.csv"
- score_df.to_csv(f'./data/{score_filename}')
- # to-logs
- log_.info({"date": datetime.strftime(now_date, '%Y%m%d%H'),
- "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_BY_DAY,
- "rule_key": key,
- "score_df": score_df[['videoid', 'score']]})
- def day_rank_bottom(now_date, rule_key):
- """未按时更新数据,用模型召回数据作为当前的数据"""
- log_.info(f"rule_key = {rule_key}")
- now_dt = datetime.strftime(now_date, '%Y%m%d')
- redis_helper = RedisHelper()
- key_name, key_dt = get_rov_redis_key(now_date=now_date)
- initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
- final_data = dict()
- for video_id, score in initial_data:
- final_data[video_id] = score
- if key_dt == now_dt:
- key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
- else:
- key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
- key_name = f"{key_name_prefix}{rule_key}.{now_dt}"
- if len(final_data) > 0:
- redis_helper.add_data_with_zset(key_name=key_name, data=final_data, expire_time=7 * 24 * 3600)
- def day_timer_check():
- project = config_.PROJECT_DAY
- table = config_.TABLE_DAY
- rule_params = config_.RULE_PARAMS_DAY
- now_date = datetime.today()
- log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d')}")
- now_min = datetime.now().minute
- # 查看当前天级更新的数据是否已准备好
- h_data_count = day_data_check(project=project, table=table, now_date=now_date)
- if h_data_count > 0:
- log_.info(f'h_data_count = {h_data_count}')
- # 数据准备好,进行更新
- rank_by_day(now_date=now_date, rule_params=rule_params, project=project, table=table)
- elif now_min > 50:
- log_.info('day_recall data is None!')
- for key, _ in rule_params.items():
- day_rank_bottom(now_date=now_date, rule_key=key)
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, day_timer_check).start()
- if __name__ == '__main__':
- day_timer_check()
|