|
@@ -4,9 +4,11 @@
|
|
|
|
|
|
|
|
|
|
|
|
+import gevent
|
|
|
import datetime
|
|
|
import pandas as pd
|
|
|
import math
|
|
|
+from functools import reduce
|
|
|
from odps import ODPS
|
|
|
from threading import Timer
|
|
|
from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status
|
|
@@ -20,6 +22,7 @@ log_ = Log()
|
|
|
region_code = config_.REGION_CODE
|
|
|
|
|
|
features = [
|
|
|
+ 'apptype',
|
|
|
'code',
|
|
|
'videoid',
|
|
|
'lastonehour_preview',
|
|
@@ -124,7 +127,7 @@ def cal_score(df, param):
|
|
|
return df
|
|
|
|
|
|
|
|
|
-def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
+def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key):
|
|
|
"""
|
|
|
获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
|
|
|
:param df:
|
|
@@ -163,21 +166,22 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
h_recall_result[int(video_id)] = float(score)
|
|
|
h_video_ids.append(int(video_id))
|
|
|
h_recall_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}.{app_type}.{data_key}.{rule_key}." \
|
|
|
+ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if len(h_recall_result) > 0:
|
|
|
redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
|
|
|
|
|
|
update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
|
|
|
|
|
|
- redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{rule_key}")
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
|
|
|
|
|
|
region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
|
|
|
|
|
|
dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
|
|
|
- region_24h_rule_key=region_24h_rule_key, region=region)
|
|
|
+ region_24h_rule_key=region_24h_rule_key, region=region, app_type=app_type, data_key=data_key)
|
|
|
|
|
|
|
|
|
-def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, region):
|
|
|
+def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, region, app_type, data_key):
|
|
|
"""将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
|
|
|
redis_helper = RedisHelper()
|
|
|
|
|
@@ -202,7 +206,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
|
|
|
|
|
|
region_24h_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{region_24h_rule_key}." \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{app_type}.{data_key}.{region_24h_rule_key}." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if redis_helper.key_exists(key_name=region_24h_key_name):
|
|
|
region_24h_data = redis_helper.get_all_data_from_zset(key_name=region_24h_key_name, with_scores=True)
|
|
@@ -214,14 +218,14 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
h_video_ids.append(int(video_id))
|
|
|
log_.info(f"region 24h data dup count = {len(region_24h_dup)}")
|
|
|
region_24h_dup_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}.{rule_key}." \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if len(region_24h_dup) > 0:
|
|
|
redis_helper.add_data_with_zset(key_name=region_24h_dup_key_name, data=region_24h_dup, expire_time=23 * 3600)
|
|
|
|
|
|
update_limit_video_score(initial_videos=region_24h_dup, key_name=region_24h_dup_key_name)
|
|
|
|
|
|
- redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
@@ -242,7 +246,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
|
|
|
|
|
|
|
|
|
- day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}rule2." \
|
|
|
+ day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}.{data_key}.rule2." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if redis_helper.key_exists(key_name=day_key_name):
|
|
|
day_data = redis_helper.get_all_data_from_zset(key_name=day_key_name, with_scores=True)
|
|
@@ -254,14 +258,14 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
h_video_ids.append(int(video_id))
|
|
|
log_.info(f"24h data dup count = {len(day_dup)}")
|
|
|
day_dup_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}.{rule_key}." \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if len(day_dup) > 0:
|
|
|
redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
|
|
|
|
|
|
update_limit_video_score(initial_videos=day_dup, key_name=day_dup_key_name)
|
|
|
|
|
|
- redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
|
|
|
|
|
|
|
|
|
model_key_name = get_rov_redis_key(now_date=now_date)
|
|
@@ -274,7 +278,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
h_video_ids.append(int(video_id))
|
|
|
log_.info(f"model data dup count = {len(model_data_dup)}")
|
|
|
model_data_dup_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}.{rule_key}." \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}.{app_type}.{data_key}.{rule_key}." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
if len(model_data_dup) > 0:
|
|
|
redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
|
|
@@ -282,38 +286,77 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
|
|
|
update_limit_video_score(initial_videos=model_data_dup, key_name=model_data_dup_key_name)
|
|
|
|
|
|
|
|
|
+def merge_df(df_left, df_right):
|
|
|
+ """
|
|
|
+ df按照videoid, code 合并,对应特征求和
|
|
|
+ :param df_left:
|
|
|
+ :param df_right:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
+ feature_list = ['videoid', 'code']
|
|
|
+ for feature in features:
|
|
|
+ if feature in ['apptype', 'videoid', 'code']:
|
|
|
+ continue
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
+ feature_list.append(feature)
|
|
|
+ return df_merged[feature_list]
|
|
|
+
|
|
|
+
|
|
|
+def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h):
|
|
|
+ log_.info(f"region = {region}")
|
|
|
+
|
|
|
+ region_df = df_merged[df_merged['code'] == region]
|
|
|
+ log_.info(f'region_df count = {len(region_df)}')
|
|
|
+ score_df = cal_score(df=region_df, param=rule_param)
|
|
|
+ video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
|
|
|
+ region=region, app_type=app_type, data_key=data_key)
|
|
|
+
|
|
|
+
|
|
|
def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
|
|
|
feature_df = get_feature_data(project=project, table=table, now_date=now_date)
|
|
|
-
|
|
|
-
|
|
|
+ feature_df['apptype'] = feature_df['apptype'].astype(int)
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
+ log_.info(f"app_type = {app_type}")
|
|
|
+ for data_key, data_param in params['data_params'].items():
|
|
|
+ log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
+ df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
+ df_merged = reduce(merge_df, df_list)
|
|
|
+ for rule_key, rule_param in params['rule_params'].items():
|
|
|
+ log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
+ task_list = [
|
|
|
+ gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
|
|
|
+ for region in region_code_list
|
|
|
+ ]
|
|
|
+ gevent.joinall(task_list)
|
|
|
+
|
|
|
|
|
|
- for key, value in rule_params.items():
|
|
|
- log_.info(f"rule = {key}, param = {value}")
|
|
|
- for region in region_code_list:
|
|
|
- log_.info(f"region = {region}")
|
|
|
-
|
|
|
- region_df = feature_df[feature_df['code'] == region]
|
|
|
- log_.info(f'region_df count = {len(region_df)}')
|
|
|
- score_df = cal_score(df=region_df, param=value)
|
|
|
- video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
|
|
|
-
|
|
|
- score_filename = f"score_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
|
- score_df.to_csv(f'./data/{score_filename}')
|
|
|
-
|
|
|
- log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
|
|
|
- "region_code": region,
|
|
|
- "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
|
|
|
- "rule_key": key,
|
|
|
-
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-def h_rank_bottom(now_date, now_h, rule_key, region_code_list, param):
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
|
|
|
"""未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
|
- log_.info(f"rule_key = {rule_key}")
|
|
|
- region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
|
|
|
|
|
|
redis_helper = RedisHelper()
|
|
|
if now_h == 0:
|
|
@@ -334,41 +377,48 @@ def h_rank_bottom(now_date, now_h, rule_key, region_code_list, param):
|
|
|
|
|
|
|
|
|
key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
|
|
|
- for region in region_code_list:
|
|
|
- log_.info(f"region = {region}")
|
|
|
- key_name = f"{key_prefix}{region}.{rule_key}.{redis_dt}.{redis_h}"
|
|
|
- initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
- if initial_data is None:
|
|
|
- initial_data = []
|
|
|
- final_data = dict()
|
|
|
- h_video_ids = []
|
|
|
- for video_id, score in initial_data:
|
|
|
- final_data[video_id] = score
|
|
|
- h_video_ids.append(int(video_id))
|
|
|
-
|
|
|
- final_key_name = \
|
|
|
- f"{key_prefix}{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
- if len(final_data) > 0:
|
|
|
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
-
|
|
|
- redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{rule_key}")
|
|
|
-
|
|
|
- dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h,
|
|
|
- rule_key=rule_key, region_24h_rule_key=region_24h_rule_key, region=region)
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
+ log_.info(f"app_type = {app_type}")
|
|
|
+ for data_key, data_param in params['data_params'].items():
|
|
|
+ log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
+ for rule_key, rule_param in params['rule_params'].items():
|
|
|
+ log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
+ region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
|
|
|
+ for region in region_code_list:
|
|
|
+ log_.info(f"region = {region}")
|
|
|
+ key_name = f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{redis_dt}.{redis_h}"
|
|
|
+ initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
+ if initial_data is None:
|
|
|
+ initial_data = []
|
|
|
+ final_data = dict()
|
|
|
+ h_video_ids = []
|
|
|
+ for video_id, score in initial_data:
|
|
|
+ final_data[video_id] = score
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+
|
|
|
+ final_key_name = \
|
|
|
+ f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(final_data) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
+
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
|
|
|
+
|
|
|
+ dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
|
|
|
+ region_24h_rule_key=region_24h_rule_key, region=region,
|
|
|
+ app_type=app_type, data_key=data_key)
|
|
|
|
|
|
|
|
|
def h_timer_check():
|
|
|
- rule_params = config_.RULE_PARAMS_REGION
|
|
|
- project = config_.PROJECT_REGION
|
|
|
- table = config_.TABLE_REGION
|
|
|
+ rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
|
|
|
+ project = config_.PROJECT_REGION_APP_TYPE
|
|
|
+ table = config_.TABLE_REGION_APP_TYPE
|
|
|
region_code_list = [code for region, code in region_code.items()]
|
|
|
- now_date = datetime.datetime.today()
|
|
|
+ now_date = datetime.datetime.today() - datetime.timedelta(hours=1)
|
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
- now_h = datetime.datetime.now().hour
|
|
|
+ now_h = datetime.datetime.now().hour - 1
|
|
|
now_min = datetime.datetime.now().minute
|
|
|
if now_h == 0:
|
|
|
- for key, value in rule_params.items():
|
|
|
- h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, region_code_list=region_code_list, param=value)
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
|
|
|
return
|
|
|
|
|
|
h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
@@ -379,8 +429,7 @@ def h_timer_check():
|
|
|
project=project, table=table, region_code_list=region_code_list)
|
|
|
elif now_min > 50:
|
|
|
log_.info('h_recall data is None, use bottom data!')
|
|
|
- for key, value in rule_params.items():
|
|
|
- h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, region_code_list=region_code_list, param=value)
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
|
|
|
else:
|
|
|
|
|
|
Timer(60, h_timer_check).start()
|