|
@@ -0,0 +1,301 @@
|
|
|
+import pandas as pd
|
|
|
+import math
|
|
|
+import traceback
|
|
|
+from functools import reduce
|
|
|
+from odps import ODPS
|
|
|
+from threading import Timer
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from get_data import get_data_from_odps
|
|
|
+from db_helper import RedisHelper
|
|
|
+from utils import filter_video_status, check_table_partition_exits, filter_video_status_app, send_msg_to_feishu
|
|
|
+from config import set_config
|
|
|
+from log import Log
|
|
|
+
|
|
|
+config_, _ = set_config()
|
|
|
+log_ = Log()
|
|
|
+
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'apptype',
|
|
|
+ 'videoid',
|
|
|
+ 'lastonehour_preview', # 过去1小时预曝光人数 - 区分地域
|
|
|
+ 'lastonehour_view', # 过去1小时曝光人数 - 区分地域
|
|
|
+ 'lastonehour_play', # 过去1小时播放人数 - 区分地域
|
|
|
+ 'lastonehour_share', # 过去1小时分享人数 - 区分地域
|
|
|
+ 'lastonehour_return', # 过去1小时分享,过去1小时回流人数 - 区分地域
|
|
|
+ 'lastonehour_preview_total', # 过去1小时预曝光次数 - 区分地域
|
|
|
+ 'lastonehour_view_total', # 过去1小时曝光次数 - 区分地域
|
|
|
+ 'lastonehour_play_total', # 过去1小时播放次数 - 区分地域
|
|
|
+ 'lastonehour_share_total', # 过去1小时分享次数 - 区分地域
|
|
|
+ 'platform_return',
|
|
|
+ 'lastonehour_show', # 不区分地域
|
|
|
+ 'lasttwohour_share', # h-2小时分享人数
|
|
|
+ 'lasttwohour_return_now', # h-2分享,过去1小时回流人数
|
|
|
+ 'lasttwohour_return', # h-2分享,h-2回流人数
|
|
|
+ 'lastthreehour_share', # h-3小时分享人数
|
|
|
+ 'lastthreehour_return_now', # h-3分享,过去1小时回流人数
|
|
|
+ 'lastthreehour_return', # h-3分享,h-3回流人数
|
|
|
+
|
|
|
+ 'lastonehour_return_new', # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+ 'lasttwohour_return_now_new', # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+ 'lasttwohour_return_new', # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+ 'lastthreehour_return_now_new', # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+ 'lastthreehour_return_new', # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+ 'platform_return_new', # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+def h_data_check(project, table, now_date):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ dt = datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ check_res = check_table_partition_exits(date=dt, project=project, table=table)
|
|
|
+ if check_res:
|
|
|
+ sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_count = reader.count
|
|
|
+ else:
|
|
|
+ data_count = 0
|
|
|
+ except Exception as e:
|
|
|
+ data_count = 0
|
|
|
+ return data_count
|
|
|
+
|
|
|
+
|
|
|
+def h_rank_bottom(now_date, now_h, rule_params):
|
|
|
+ """未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ if now_h == 0:
|
|
|
+ redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
|
|
|
+ redis_h = 23
|
|
|
+ else:
|
|
|
+ redis_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ redis_h = now_h - 1
|
|
|
+ key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H_H
|
|
|
+
|
|
|
+ for param in rule_params.get('params_list'):
|
|
|
+ data_key = param.get('data')
|
|
|
+ rule_key = param.get('rule')
|
|
|
+ log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
|
|
|
+ key_name = f"{key_prefix}{data_key}:{rule_key}:{redis_dt}:{redis_h}"
|
|
|
+ initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
+ if initial_data is None:
|
|
|
+ initial_data = []
|
|
|
+ final_data = dict()
|
|
|
+ for video_id, score in initial_data:
|
|
|
+ final_data[video_id] = score
|
|
|
+ # 存入对应的redis
|
|
|
+ final_key_name = \
|
|
|
+ f"{key_prefix}{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
+ if len(final_data) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+def get_feature_data(project, table, now_date):
|
|
|
+ """获取特征数据"""
|
|
|
+ dt = datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ records = get_data_from_odps(date=dt, project=project, table=table)
|
|
|
+ feature_data = []
|
|
|
+ for record in records:
|
|
|
+ item = {}
|
|
|
+ for feature_name in features:
|
|
|
+ item[feature_name] = record[feature_name]
|
|
|
+ feature_data.append(item)
|
|
|
+ feature_df = pd.DataFrame(feature_data)
|
|
|
+ return feature_df
|
|
|
+
|
|
|
+
|
|
|
+def cal_score(df, param):
|
|
|
+ # score = sharerate * backrate * LOG(lastonehour_return + 1) * K2
|
|
|
+ # sharerate = lastonehour_share / (lastonehour_play + 1000)
|
|
|
+ # backrate = lastonehour_return / (lastonehour_share + 10)
|
|
|
+ # ctr = lastonehour_play / (lastonehour_show + 1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
|
|
|
+
|
|
|
+ df = df.fillna(0)
|
|
|
+ df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
|
|
|
+ df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
|
|
|
+ df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
|
|
|
+ if param.get('view_type', None) == 'video-show':
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
|
|
|
+ else:
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
|
|
|
+ df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
|
|
|
+
|
|
|
+ df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
|
|
|
+
|
|
|
+ df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
|
|
|
+
|
|
|
+ df = df.sort_values(by=['score'], ascending=False)
|
|
|
+ return df
|
|
|
+
|
|
|
+
|
|
|
+def merge_df(df_left, df_right):
|
|
|
+ """
|
|
|
+ df按照videoid 合并,对应特征求和
|
|
|
+ :param df_left:
|
|
|
+ :param df_right:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
+ feature_list = ['videoid']
|
|
|
+ for feature in features:
|
|
|
+ if feature in ['apptype', 'videoid']:
|
|
|
+ continue
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
+ feature_list.append(feature)
|
|
|
+ return df_merged[feature_list]
|
|
|
+
|
|
|
+
|
|
|
+def merge_df_with_score(df_left, df_right):
|
|
|
+ """
|
|
|
+ df 按照videoid合并,平台回流人数、回流人数、分数 分别求和
|
|
|
+ :param df_left:
|
|
|
+ :param df_right:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
+ feature_list = ['videoid', 'lastonehour_return', 'platform_return', 'score']
|
|
|
+ for feature in feature_list[1:]:
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
+ return df_merged[feature_list]
|
|
|
+
|
|
|
+
|
|
|
+def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
|
|
|
+ """
|
|
|
+ 获取符合进入召回源条件的视频
|
|
|
+ """
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ log_.info(f"videos_count = {len(df)}")
|
|
|
+
|
|
|
+ # videoid重复时,保留分值高
|
|
|
+ df = df.sort_values(by=['score'], ascending=False)
|
|
|
+ df = df.drop_duplicates(subset=['videoid'], keep='first')
|
|
|
+ df['videoid'] = df['videoid'].astype(int)
|
|
|
+
|
|
|
+ # 获取符合进入召回源条件的视频
|
|
|
+ platform_return_rate = param.get('platform_return_rate', 0)
|
|
|
+ h_recall_df = df[df['platform_return_rate'] > platform_return_rate]
|
|
|
+ h_recall_videos = h_recall_df['videoid'].to_list()
|
|
|
+ log_.info(f'h_recall videos count = {len(h_recall_videos)}')
|
|
|
+
|
|
|
+ # 视频状态过滤
|
|
|
+ if data_key in ['data7', ]:
|
|
|
+ filtered_videos = filter_video_status_app(h_recall_videos)
|
|
|
+ else:
|
|
|
+ # filtered_videos = filter_video_status(h_recall_videos)
|
|
|
+ filtered_videos = h_recall_videos
|
|
|
+ log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
|
|
|
+
|
|
|
+ # 写入对应的redis
|
|
|
+ now_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ h_video_ids = []
|
|
|
+ h_recall_result = {}
|
|
|
+ for video_id in filtered_videos:
|
|
|
+ score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
|
|
|
+ h_recall_result[int(video_id)] = float(score)
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+
|
|
|
+ h_recall_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{rule_key}:{now_dt}:{now_h}"
|
|
|
+
|
|
|
+ if len(h_recall_result) > 0:
|
|
|
+ log_.info(f"count = {len(h_recall_result)}, key = {h_recall_key_name}")
|
|
|
+ redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+def rank_by_h(now_date, now_h, rule_params, project, table):
|
|
|
+ # 获取特征数据
|
|
|
+ feature_df = get_feature_data(now_date=now_date, project=project, table=table)
|
|
|
+ feature_df['apptype'] = feature_df['apptype'].astype(int)
|
|
|
+ # rank
|
|
|
+ data_params_item = rule_params.get('data_params')
|
|
|
+ rule_params_item = rule_params.get('rule_params')
|
|
|
+
|
|
|
+ for param in rule_params.get('params_list'):
|
|
|
+ score_df_list = []
|
|
|
+ data_key = param.get('data')
|
|
|
+ data_param = data_params_item.get(data_key)
|
|
|
+ log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
+ rule_key = param.get('rule')
|
|
|
+ rule_param = rule_params_item.get(rule_key)
|
|
|
+ log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
+ merge_func = rule_param.get('merge_func', 1)
|
|
|
+
|
|
|
+ if merge_func == 2:
|
|
|
+ for apptype, weight in data_param.items():
|
|
|
+ df = feature_df[feature_df['apptype'] == apptype]
|
|
|
+ # 计算score
|
|
|
+ score_df = cal_score(df=df, param=rule_param)
|
|
|
+ score_df['score'] = score_df['score'] * weight
|
|
|
+ score_df_list.append(score_df)
|
|
|
+ # 分数合并
|
|
|
+ df_merged = reduce(merge_df_with_score, score_df_list)
|
|
|
+ # 更新平台回流比
|
|
|
+ df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastonehour_return']
|
|
|
+ video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
+
|
|
|
+ else:
|
|
|
+ df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
|
|
|
+ df_merged = reduce(merge_df, df_list)
|
|
|
+ score_df = cal_score(df=df_merged, param=rule_param)
|
|
|
+ video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
+
|
|
|
+
|
|
|
+def h_timer_check():
|
|
|
+ try:
|
|
|
+ project = config_.PROJECT_H_APP_TYPE
|
|
|
+ table = config_.TABLE_H_APP_TYPE
|
|
|
+ rule_params = config_.RULE_PARAMS_H_APP_TYPE
|
|
|
+ now_date = datetime.today()
|
|
|
+ log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
+ now_min = datetime.now().minute
|
|
|
+ now_h = datetime.now().hour
|
|
|
+
|
|
|
+ if now_h == 0:
|
|
|
+ log_.info(f'now_h = {now_h} use bottom data!')
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
|
+ log_.info(f"h_data end!")
|
|
|
+ return
|
|
|
+ # 查看当前小时级更新的数据是否已准备好
|
|
|
+ h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
|
+ if h_data_count > 0:
|
|
|
+ log_.info(f'h_data_count = {h_data_count}')
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
|
|
|
+ log_.info(f"h_data end!")
|
|
|
+ elif now_min > 40:
|
|
|
+ log_.info('h_recall data is None, use bottom data!')
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
|
+ log_.info(f"24h_data end!")
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, h_timer_check).start()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(f"不区分地域小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
+ send_msg_to_feishu(
|
|
|
+ webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
|
|
|
+ key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
|
|
|
+ msg_text=f"rov-offline{config_.ENV_TEXT} - 不区分地域小时级数据更新失败\n"
|
|
|
+ f"exception: {e}\n"
|
|
|
+ f"traceback: {traceback.format_exc()}"
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ log_.info(f"h_data start...")
|
|
|
+ h_timer_check()
|