|
@@ -7,6 +7,7 @@
|
|
import datetime
|
|
import datetime
|
|
import pandas as pd
|
|
import pandas as pd
|
|
import math
|
|
import math
|
|
|
|
+from functools import reduce
|
|
from odps import ODPS
|
|
from odps import ODPS
|
|
from threading import Timer
|
|
from threading import Timer
|
|
from utils import RedisHelper, get_data_from_odps, filter_video_status
|
|
from utils import RedisHelper, get_data_from_odps, filter_video_status
|
|
@@ -16,45 +17,10 @@ from log import Log
|
|
config_, _ = set_config()
|
|
config_, _ = set_config()
|
|
log_ = Log()
|
|
log_ = Log()
|
|
|
|
|
|
-region_code = {
|
|
|
|
- '河北省': '130000',
|
|
|
|
- '山西省': '140000',
|
|
|
|
- '辽宁省': '210000',
|
|
|
|
- '吉林省': '220000',
|
|
|
|
- '黑龙江省': '230000',
|
|
|
|
- '江苏省': '320000',
|
|
|
|
- '浙江省': '330000',
|
|
|
|
- '安徽省': '340000',
|
|
|
|
- '福建省': '350000',
|
|
|
|
- '江西省': '360000',
|
|
|
|
- '山东省': '370000',
|
|
|
|
- '河南省': '410000',
|
|
|
|
- '湖北省': '420000',
|
|
|
|
- '湖南省': '430000',
|
|
|
|
- '广东省': '440000',
|
|
|
|
- '海南省': '460000',
|
|
|
|
- '四川省': '510000',
|
|
|
|
- '贵州省': '520000',
|
|
|
|
- '云南省': '530000',
|
|
|
|
- '陕西省': '610000',
|
|
|
|
- '甘肃省': '620000',
|
|
|
|
- '青海省': '630000',
|
|
|
|
- '台湾省': '710000',
|
|
|
|
- '北京': '110000',
|
|
|
|
- '天津': '120000',
|
|
|
|
- '内蒙古': '150000',
|
|
|
|
- '上海': '310000',
|
|
|
|
- '广西': '450000',
|
|
|
|
- '重庆': '500000',
|
|
|
|
- '西藏': '540000',
|
|
|
|
- '宁夏': '640000',
|
|
|
|
- '新疆': '650000',
|
|
|
|
- '香港': '810000',
|
|
|
|
- '澳门': '820000',
|
|
|
|
- 'None': '-1'
|
|
|
|
-}
|
|
|
|
|
|
+region_code = config_.REGION_CODE
|
|
|
|
|
|
features = [
|
|
features = [
|
|
|
|
+ 'apptype',
|
|
'code', # 省份编码
|
|
'code', # 省份编码
|
|
'videoid',
|
|
'videoid',
|
|
'lastday_preview', # 昨日预曝光人数
|
|
'lastday_preview', # 昨日预曝光人数
|
|
@@ -153,7 +119,7 @@ def cal_score(df, param):
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
|
|
-def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
|
|
|
+def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key):
|
|
"""
|
|
"""
|
|
获取符合进入召回源条件的视频
|
|
获取符合进入召回源条件的视频
|
|
:param df:
|
|
:param df:
|
|
@@ -192,39 +158,78 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
h_video_ids.append(int(video_id))
|
|
h_video_ids.append(int(video_id))
|
|
|
|
|
|
day_recall_key_name = \
|
|
day_recall_key_name = \
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{rule_key}." \
|
|
|
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{app_type}.{data_key}.{region}.{rule_key}." \
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
if len(day_recall_result) > 0:
|
|
if len(day_recall_result) > 0:
|
|
redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=23 * 3600)
|
|
redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=23 * 3600)
|
|
# 清空线上过滤应用列表
|
|
# 清空线上过滤应用列表
|
|
- redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{app_type}.{data_key}.{region}.{rule_key}")
|
|
|
|
+
|
|
# 与其他召回视频池去重,存入对应的redis
|
|
# 与其他召回视频池去重,存入对应的redis
|
|
# dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
# dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
|
|
|
|
|
|
|
|
|
|
+def merge_df(df_left, df_right):
|
|
|
|
+ """
|
|
|
|
+ df按照videoid, code 合并,对应特征求和
|
|
|
|
+ :param df_left:
|
|
|
|
+ :param df_right:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
|
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
|
+ feature_list = ['videoid', 'code']
|
|
|
|
+ for feature in features:
|
|
|
|
+ if feature in ['apptype', 'videoid', 'code']:
|
|
|
|
+ continue
|
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
|
+ feature_list.append(feature)
|
|
|
|
+ return df_merged[feature_list]
|
|
|
|
+
|
|
|
|
+
|
|
def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
# 获取特征数据
|
|
# 获取特征数据
|
|
feature_df = get_feature_data(project=project, table=table, now_date=now_date)
|
|
feature_df = get_feature_data(project=project, table=table, now_date=now_date)
|
|
|
|
+ feature_df['apptype'] = feature_df['apptype'].astype(int)
|
|
# rank
|
|
# rank
|
|
- for key, value in rule_params.items():
|
|
|
|
- log_.info(f"rule = {key}, param = {value}")
|
|
|
|
- for region in region_code_list:
|
|
|
|
- log_.info(f"region = {region}")
|
|
|
|
- # 计算score
|
|
|
|
- region_df = feature_df[feature_df['code'] == region]
|
|
|
|
- log_.info(f'region_df count = {len(region_df)}')
|
|
|
|
- score_df = cal_score(df=region_df, param=value)
|
|
|
|
- video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
|
|
|
|
- # to-csv
|
|
|
|
- score_filename = f"score_24h_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
|
|
- score_df.to_csv(f'./data/{score_filename}')
|
|
|
|
- # to-logs
|
|
|
|
- log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
|
|
|
|
- "region_code": region,
|
|
|
|
- "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
|
|
|
|
- "rule_key": key,
|
|
|
|
- # "score_df": score_df[['videoid', 'score']]
|
|
|
|
- })
|
|
|
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
|
+ log_.info(f"app_type = {app_type}")
|
|
|
|
+ for data_key, data_param in params['data_params'].items():
|
|
|
|
+ log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
|
+ df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
|
+ df_merged = reduce(merge_df, df_list)
|
|
|
|
+ for rule_key, rule_param in params['rule_params'].items():
|
|
|
|
+ log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
|
+ for region in region_code_list:
|
|
|
|
+ log_.info(f"region = {region}")
|
|
|
|
+ # 计算score
|
|
|
|
+ region_df = df_merged[df_merged['code'] == region]
|
|
|
|
+ log_.info(f'region_df count = {len(region_df)}')
|
|
|
|
+ score_df = cal_score(df=region_df, param=rule_param)
|
|
|
|
+ video_rank(df=score_df, now_date=now_date, now_h=now_h,
|
|
|
|
+ rule_key=rule_key, param=rule_param, region=region,
|
|
|
|
+ app_type=app_type, data_key=data_key)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # for key, value in rule_params.items():
|
|
|
|
+ # log_.info(f"rule = {key}, param = {value}")
|
|
|
|
+ # for region in region_code_list:
|
|
|
|
+ # log_.info(f"region = {region}")
|
|
|
|
+ # # 计算score
|
|
|
|
+ # region_df = feature_df[feature_df['code'] == region]
|
|
|
|
+ # log_.info(f'region_df count = {len(region_df)}')
|
|
|
|
+ # score_df = cal_score(df=region_df, param=value)
|
|
|
|
+ # video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
|
|
|
|
+ # # to-csv
|
|
|
|
+ # score_filename = f"score_24h_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
|
|
+ # score_df.to_csv(f'./data/{score_filename}')
|
|
|
|
+ # # to-logs
|
|
|
|
+ # log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
|
|
|
|
+ # "region_code": region,
|
|
|
|
+ # "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
|
|
|
|
+ # "rule_key": key,
|
|
|
|
+ # # "score_df": score_df[['videoid', 'score']]
|
|
|
|
+ # })
|
|
|
|
|
|
|
|
|
|
def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
|
|
def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
|
|
@@ -265,10 +270,8 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
|
|
redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
|
|
redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
|
|
|
|
|
|
|
|
|
|
-def h_rank_bottom(now_date, now_h, rule_key, region_code_list):
|
|
|
|
|
|
+def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
|
|
"""未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
"""未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
- log_.info(f"rule_key = {rule_key}")
|
|
|
|
- # 获取rov模型结果
|
|
|
|
redis_helper = RedisHelper()
|
|
redis_helper = RedisHelper()
|
|
if now_h == 0:
|
|
if now_h == 0:
|
|
redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
@@ -279,35 +282,42 @@ def h_rank_bottom(now_date, now_h, rule_key, region_code_list):
|
|
|
|
|
|
# 以上一小时的地域分组数据作为当前小时的数据
|
|
# 以上一小时的地域分组数据作为当前小时的数据
|
|
key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H
|
|
key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H
|
|
- for region in region_code_list:
|
|
|
|
- log_.info(f"region = {region}")
|
|
|
|
- key_name = f"{key_prefix}{region}.{rule_key}.{redis_dt}.{redis_h}"
|
|
|
|
- initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
|
- if initial_data is None:
|
|
|
|
- initial_data = []
|
|
|
|
- final_data = dict()
|
|
|
|
- h_video_ids = []
|
|
|
|
- for video_id, score in initial_data:
|
|
|
|
- final_data[video_id] = score
|
|
|
|
- h_video_ids.append(int(video_id))
|
|
|
|
- # 存入对应的redis
|
|
|
|
- final_key_name = \
|
|
|
|
- f"{key_prefix}{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
- if len(final_data) > 0:
|
|
|
|
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
|
- # 清空线上过滤应用列表
|
|
|
|
- redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
|
- # 与其他召回视频池去重,存入对应的redis
|
|
|
|
- dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
|
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
|
+ log_.info(f"app_type = {app_type}")
|
|
|
|
+ for data_key, data_param in params['data_params'].items():
|
|
|
|
+ log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
|
+ for rule_key, rule_param in params['rule_params'].items():
|
|
|
|
+ log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
|
+ for region in region_code_list:
|
|
|
|
+ log_.info(f"region = {region}")
|
|
|
|
+ key_name = f"{key_prefix}{app_type}.{data_key}.{region}.{rule_key}.{redis_dt}.{redis_h}"
|
|
|
|
+ initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
|
+ if initial_data is None:
|
|
|
|
+ initial_data = []
|
|
|
|
+ final_data = dict()
|
|
|
|
+ h_video_ids = []
|
|
|
|
+ for video_id, score in initial_data:
|
|
|
|
+ final_data[video_id] = score
|
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
|
+ # 存入对应的redis
|
|
|
|
+ final_key_name = \
|
|
|
|
+ f"{key_prefix}{app_type}.{data_key}.{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
+ if len(final_data) > 0:
|
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
|
+ # 清空线上过滤应用列表
|
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{app_type}.{data_key}.{region}.{rule_key}")
|
|
|
|
+
|
|
|
|
+ # 与其他召回视频池去重,存入对应的redis
|
|
|
|
+ # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
|
|
|
|
|
|
|
|
def h_timer_check():
|
|
def h_timer_check():
|
|
- rule_params = config_.RULE_PARAMS_REGION_24H
|
|
|
|
- project = config_.PROJECT_REGION_24H
|
|
|
|
- table = config_.TABLE_REGION_24H
|
|
|
|
|
|
+ rule_params = config_.RULE_PARAMS_REGION_24H_APP_TYPE
|
|
|
|
+ project = config_.PROJECT_REGION_24H_APP_TYPE
|
|
|
|
+ table = config_.TABLE_REGION_24H_APP_TYPE
|
|
region_code_list = [code for region, code in region_code.items()]
|
|
region_code_list = [code for region, code in region_code.items()]
|
|
- now_date = datetime.datetime.today()
|
|
|
|
- now_h = datetime.datetime.now().hour
|
|
|
|
|
|
+ now_date = datetime.datetime.today() - datetime.timedelta(hours=7)
|
|
|
|
+ now_h = datetime.datetime.now().hour - 7
|
|
now_min = datetime.datetime.now().minute
|
|
now_min = datetime.datetime.now().minute
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
# 查看当天更新的数据是否已准备好
|
|
# 查看当天更新的数据是否已准备好
|
|
@@ -320,7 +330,7 @@ def h_timer_check():
|
|
elif now_min > 50:
|
|
elif now_min > 50:
|
|
log_.info('24h_recall data is None, use bottom data!')
|
|
log_.info('24h_recall data is None, use bottom data!')
|
|
for key, _ in rule_params.items():
|
|
for key, _ in rule_params.items():
|
|
- h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, region_code_list=region_code_list)
|
|
|
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
|
|
else:
|
|
else:
|
|
# 数据没准备好,1分钟后重新检查
|
|
# 数据没准备好,1分钟后重新检查
|
|
Timer(60, h_timer_check).start()
|
|
Timer(60, h_timer_check).start()
|