|
@@ -59,6 +59,9 @@ features = [
|
|
|
'lastthreehour_return_now_new', # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
'lastthreehour_return_new', # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
'platform_return_new', # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
|
|
|
+
|
|
|
+ 'lastonehour_allreturn',
|
|
|
+ 'lastonehour_allreturn_sharecnt'
|
|
|
]
|
|
|
|
|
|
|
|
@@ -85,9 +88,12 @@ def h_data_check(project, table, now_date):
|
|
|
|
|
|
try:
|
|
|
dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ # 测试 张博
|
|
|
check_res = check_table_partition_exits(date=dt, project=project, table=table)
|
|
|
if check_res:
|
|
|
- sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ sql = f'select * from {project}.{table} where dt = "{dt}"'
|
|
|
+ print("zhangbo-sql-是否有数据")
|
|
|
+ print(sql)
|
|
|
with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
data_count = reader.count
|
|
|
else:
|
|
@@ -127,7 +133,7 @@ def get_day_30day_videos(now_date, data_key, rule_key):
|
|
|
def get_feature_data(project, table, now_date):
|
|
|
"""获取特征数据"""
|
|
|
dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
- # dt = '2022041310'
|
|
|
+ # 张博 测试
|
|
|
records = get_data_from_odps(date=dt, project=project, table=table)
|
|
|
feature_data = []
|
|
|
for record in records:
|
|
@@ -139,6 +145,37 @@ def get_feature_data(project, table, now_date):
|
|
|
return feature_df
|
|
|
|
|
|
|
|
|
+def cal_score_initial_20240223(df, param):
|
|
|
+ """
|
|
|
+ 计算score
|
|
|
+ :param df: 特征数据
|
|
|
+ :param param: 规则参数
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ log_.info("进入了cal_score_initial_20240223")
|
|
|
+ df = df.fillna(0)
|
|
|
+ df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
|
|
|
+ df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
|
|
|
+ df['back_rate_new'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
|
|
|
+ df['back_rate_all'] = df['lastonehour_allreturn'] / (df['lastonehour_allreturn_sharecnt'] + 10)
|
|
|
+ df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
|
|
|
+ df['log_back_all'] = (df['lastonehour_allreturn'] + 1).apply(math.log)
|
|
|
+ if param.get('view_type', None) == 'video-show':
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
|
|
|
+ elif param.get('view_type', None) == 'video-show-region':
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
|
|
|
+ else:
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
|
|
|
+ df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
|
|
|
+ df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
|
|
|
+ df['score'] = df['share_rate'] * (
|
|
|
+ df['back_rate_new'] + 0.01 * df['back_rate_all']
|
|
|
+ ) * (
|
|
|
+ df['log_back'] + 0.01 * df['log_back_all']
|
|
|
+ ) * df['K2']
|
|
|
+ df = df.sort_values(by=['score'], ascending=False)
|
|
|
+ return df
|
|
|
+
|
|
|
def cal_score_initial(df, param):
|
|
|
"""
|
|
|
计算score
|
|
@@ -510,6 +547,8 @@ def cal_score(df, param):
|
|
|
df = cal_score_with_back_rate_exponential_weighting2(df=df, param=param)
|
|
|
elif param.get('score_func', None) == 'back_rate_rank_weighting':
|
|
|
df = cal_score_with_back_rate_by_rank_weighting(df=df, param=param)
|
|
|
+ elif param.get('score_func', None) == '20240223':
|
|
|
+ df = cal_score_initial_20240223(df=df, param=param)
|
|
|
else:
|
|
|
df = cal_score_initial(df=df, param=param)
|
|
|
return df
|
|
@@ -601,57 +640,58 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
|
|
|
return_count = param.get('return_count', 1)
|
|
|
score_value = param.get('score_rule', 0)
|
|
|
platform_return_rate = param.get('platform_return_rate', 0)
|
|
|
+ # h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
|
|
|
+ # & (df['platform_return_rate'] >= platform_return_rate)]
|
|
|
+ # h_recall_df = df[
|
|
|
+ # (df['lastonehour_return'] >= return_count) &
|
|
|
+ # (df['score'] >= score_value) &
|
|
|
+ # (df['platform_return_rate'] >= platform_return_rate)
|
|
|
+ # ]
|
|
|
h_recall_df = df[
|
|
|
- (df['lastonehour_return'] >= return_count) &
|
|
|
- (df['score'] >= score_value) &
|
|
|
- (df['platform_return_rate'] >= platform_return_rate)
|
|
|
- ]
|
|
|
- try:
|
|
|
- if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
|
|
|
- return_countv2 = param["return_countv2"]
|
|
|
- platform_return_ratev2 = param["platform_return_ratev2"]
|
|
|
- h_recall_df = h_recall_df[
|
|
|
- df['platform_return_rate'] >= platform_return_ratev2 |
|
|
|
- (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
|
|
|
- ]
|
|
|
- except Exception as e:
|
|
|
- log_.error("return_countv2 is wrong with{}".format(e))
|
|
|
-
|
|
|
+ (df['lastonehour_allreturn'] > 0)
|
|
|
+ ]
|
|
|
+ # try:
|
|
|
+ # if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
|
|
|
+ # return_countv2 = param["return_countv2"]
|
|
|
+ # platform_return_ratev2 = param["platform_return_ratev2"]
|
|
|
+ # h_recall_df = h_recall_df[
|
|
|
+ # df['platform_return_rate'] >= platform_return_ratev2 |
|
|
|
+ # (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
|
|
|
+ # ]
|
|
|
+ # except Exception as e:
|
|
|
+ # log_.error("return_countv2 is wrong with{}".format(e))
|
|
|
|
|
|
# videoid重复时,保留分值高
|
|
|
h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
|
|
|
h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
|
|
|
h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
|
|
|
|
|
|
+ log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
|
|
|
# 增加打捞的优质视频
|
|
|
if add_videos_with_pre_h is True:
|
|
|
add_func = param.get('add_func', None)
|
|
|
h_recall_df = add_videos(initial_df=h_recall_df, now_date=now_date, rule_key=rule_key,
|
|
|
region=region, data_key=data_key, hour_count=hour_count, top=10, add_func=add_func)
|
|
|
-
|
|
|
+ log_.info(f"打捞优质视频完成")
|
|
|
h_recall_videos = h_recall_df['videoid'].to_list()
|
|
|
- # log_.info(f'h_recall videos count = {len(h_recall_videos)}')
|
|
|
-
|
|
|
+ log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_videos)}")
|
|
|
# 视频状态过滤
|
|
|
if data_key in ['data7', ]:
|
|
|
filtered_videos = filter_video_status_app(h_recall_videos)
|
|
|
else:
|
|
|
filtered_videos = filter_video_status(h_recall_videos)
|
|
|
- # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
|
|
|
|
|
|
# 屏蔽视频过滤
|
|
|
shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
|
|
|
shield_key_name_list = shield_config.get(region, None)
|
|
|
if shield_key_name_list is not None:
|
|
|
filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
|
|
|
- # log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
|
|
|
|
|
|
# 涉政视频过滤
|
|
|
political_filter = param.get('political_filter', None)
|
|
|
if political_filter is True:
|
|
|
- # log_.info(f"political filter videos count = {len(filtered_videos)}")
|
|
|
filtered_videos = filter_political_videos(video_ids=filtered_videos)
|
|
|
- # log_.info(f"political filtered videos count = {len(filtered_videos)}")
|
|
|
+ log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
|
|
|
|
|
|
# 写入对应的redis
|
|
|
h_video_ids = []
|
|
@@ -673,8 +713,9 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
|
|
|
h_recall_key_name = \
|
|
|
f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
+ log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
|
|
|
if len(h_recall_result) > 0:
|
|
|
- # log_.info(f"h_recall_result count = {len(h_recall_result)}")
|
|
|
+ log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
|
|
|
redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
|
|
|
# 限流视频score调整
|
|
|
update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
|
|
@@ -717,8 +758,7 @@ def merge_df_with_score(df_left, df_right):
|
|
|
|
|
|
def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
|
|
|
rule_rank_h_flag, add_videos_with_pre_h, hour_count):
|
|
|
- log_.info(f"region = {region} start...")
|
|
|
- # 计算score
|
|
|
+ log_.info(f"多协程的region = {region} 开始执行")
|
|
|
region_df = df_merged[df_merged['code'] == region]
|
|
|
log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
|
|
|
score_df = cal_score(df=region_df, param=rule_param)
|
|
@@ -805,10 +845,8 @@ def process_with_param(param, data_params_item, rule_params_item, region_code_li
|
|
|
|
|
|
data_key = param.get('data')
|
|
|
data_param = data_params_item.get(data_key)
|
|
|
- log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
rule_key = param.get('rule')
|
|
|
rule_param = rule_params_item.get(rule_key)
|
|
|
- log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
merge_func = rule_param.get('merge_func', None)
|
|
|
log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
|
|
|
log_.info("具体的规则是:{}.".format(rule_param))
|
|
@@ -1079,11 +1117,12 @@ def h_timer_check():
|
|
|
table = config_.TABLE_REGION_APP_TYPE
|
|
|
region_code_list = [code for region, code in region_code.items()]
|
|
|
now_date = datetime.datetime.today()
|
|
|
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
|
|
|
+ log_.info(f"开始执行: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
now_h = datetime.datetime.now().hour
|
|
|
now_min = datetime.datetime.now().minute
|
|
|
redis_helper = RedisHelper()
|
|
|
if now_h == 0:
|
|
|
+ log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
|
|
|
h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
|
|
|
rule_rank_h_flag=rule_rank_h_flag)
|
|
|
log_.info(f"region_h_data end!")
|
|
@@ -1096,7 +1135,7 @@ def h_timer_check():
|
|
|
# 查看当前小时更新的数据是否已准备好
|
|
|
h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
|
if h_data_count > 0:
|
|
|
- log_.info(f'region_h_data_count = {h_data_count}')
|
|
|
+ log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
|
|
|
# 数据准备好,进行更新
|
|
|
rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
|
|
|
project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
|
|
@@ -1118,6 +1157,7 @@ def h_timer_check():
|
|
|
log_.info(f"region_h_data status update to '1' finished!")
|
|
|
else:
|
|
|
# 数据没准备好,1分钟后重新检查
|
|
|
+ log_.info("上游数据未就绪,等待...")
|
|
|
Timer(60, h_timer_check).start()
|
|
|
|
|
|
except Exception as e:
|
|
@@ -1132,5 +1172,5 @@ def h_timer_check():
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- log_.info(f"region_h_data start...")
|
|
|
+ log_.info(f"region-rule-rank-h-v2 start...")
|
|
|
h_timer_check()
|