|
@@ -0,0 +1,219 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# @ModuleName: region_rule_rank_h
|
|
|
+# @Author: Liqian
|
|
|
+# @Time: 2022/5/5 15:54
|
|
|
+# @Software: PyCharm
|
|
|
+
|
|
|
+import datetime
|
|
|
+import pandas as pd
|
|
|
+import math
|
|
|
+from odps import ODPS
|
|
|
+from threading import Timer
|
|
|
+from utils import RedisHelper, get_data_from_odps, filter_video_status
|
|
|
+from config import set_config
|
|
|
+from log import Log
|
|
|
+
|
|
|
+config_, _ = set_config()
|
|
|
+log_ = Log()
|
|
|
+
|
|
|
+region_code = {
|
|
|
+ '河北省': '130000',
|
|
|
+ '山西省': '140000',
|
|
|
+ '辽宁省': '210000',
|
|
|
+ '吉林省': '220000',
|
|
|
+ '黑龙江省': '230000',
|
|
|
+ '江苏省': '320000',
|
|
|
+ '浙江省': '330000',
|
|
|
+ '安徽省': '340000',
|
|
|
+ '福建省': '350000',
|
|
|
+ '江西省': '360000',
|
|
|
+ '山东省': '370000',
|
|
|
+ '河南省': '410000',
|
|
|
+ '湖北省': '420000',
|
|
|
+ '湖南省': '430000',
|
|
|
+ '广东省': '440000',
|
|
|
+ '海南省': '460000',
|
|
|
+ '四川省': '510000',
|
|
|
+ '贵州省': '520000',
|
|
|
+ '云南省': '530000',
|
|
|
+ '陕西省': '610000',
|
|
|
+ '甘肃省': '620000',
|
|
|
+ '青海省': '630000',
|
|
|
+ '台湾省': '710000',
|
|
|
+ '北京': '110000',
|
|
|
+ '天津': '120000',
|
|
|
+ '内蒙古': '150000',
|
|
|
+ '上海': '310000',
|
|
|
+ '广西': '450000',
|
|
|
+ '重庆': '500000',
|
|
|
+ '西藏': '540000',
|
|
|
+ '宁夏': '640000',
|
|
|
+ '新疆': '650000',
|
|
|
+ '香港': '810000',
|
|
|
+ '澳门': '820000',
|
|
|
+}
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'code', # 省份编码
|
|
|
+ 'videoid',
|
|
|
+ 'lastday_preview', # 昨日预曝光人数
|
|
|
+ 'lastday_view', # 昨日曝光人数
|
|
|
+ 'lastday_play', # 昨日播放人数
|
|
|
+ 'lastday_share', # 昨日分享人数
|
|
|
+ 'lastday_return', # 昨日回流人数
|
|
|
+ 'lastday_preview_total', # 昨日预曝光次数
|
|
|
+ 'lastday_view_total', # 昨日曝光次数
|
|
|
+ 'lastday_play_total', # 昨日播放次数
|
|
|
+ 'lastday_share_total', # 昨日分享次数
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+def data_check(project, table, now_date):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_count = reader.count
|
|
|
+ except Exception as e:
|
|
|
+ data_count = 0
|
|
|
+ return data_count
|
|
|
+
|
|
|
+
|
|
|
+def get_feature_data(project, table, now_date):
|
|
|
+ """获取特征数据"""
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ # dt = '2022041310'
|
|
|
+ records = get_data_from_odps(date=dt, project=project, table=table)
|
|
|
+ feature_data = []
|
|
|
+ for record in records:
|
|
|
+ item = {}
|
|
|
+ for feature_name in features:
|
|
|
+ item[feature_name] = record[feature_name]
|
|
|
+ feature_data.append(item)
|
|
|
+ feature_df = pd.DataFrame(feature_data)
|
|
|
+ return feature_df
|
|
|
+
|
|
|
+
|
|
|
+def cal_score(df):
|
|
|
+ """
|
|
|
+ 计算score
|
|
|
+ :param df: 特征数据
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ # score计算公式: sharerate*backrate*logback*ctr
|
|
|
+ # sharerate = lastday_share/(lastday_play+1000)
|
|
|
+ # backrate = lastday_return/(lastday_share+10)
|
|
|
+ # ctr = lastday_play/(lastday_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
|
|
|
+ # score = sharerate * backrate * LOG(lastday_return+1) * K2
|
|
|
+
|
|
|
+ df = df.fillna(0)
|
|
|
+ df['share_rate'] = df['lastday_share'] / (df['lastday_play'] + 1000)
|
|
|
+ df['back_rate'] = df['lastday_return'] / (df['lastday_share'] + 10)
|
|
|
+ df['log_back'] = (df['lastday_return'] + 1).apply(math.log)
|
|
|
+ df['ctr'] = df['lastday_play'] / (df['lastday_preview'] + 1000)
|
|
|
+ df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
|
|
|
+ df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
|
|
|
+ df = df.sort_values(by=['score'], ascending=False)
|
|
|
+ return df
|
|
|
+
|
|
|
+
|
|
|
+def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
+ """
|
|
|
+ 获取符合进入召回源条件的视频
|
|
|
+ :param df:
|
|
|
+ :param now_date:
|
|
|
+ :param now_h:
|
|
|
+ :param rule_key: 小时级数据进入条件
|
|
|
+ :param param: 小时级数据进入条件参数
|
|
|
+ :param region: 所属地域
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ # 获取符合进入召回源条件的视频
|
|
|
+ return_count = param.get('return_count', 1)
|
|
|
+ score_value = param.get('score_rule', 0)
|
|
|
+ h_recall_df = df[(df['lastday_return'] >= return_count) & (df['score'] >= score_value)]
|
|
|
+ # videoid重复时,保留分值高
|
|
|
+ h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
|
|
|
+ h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
|
|
|
+ h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
|
|
|
+ h_recall_videos = h_recall_df['videoid'].to_list()
|
|
|
+ log_.info(f'day_recall videos count = {len(h_recall_videos)}')
|
|
|
+
|
|
|
+ # 视频状态过滤
|
|
|
+ filtered_videos = filter_video_status(h_recall_videos)
|
|
|
+ log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
|
|
|
+
|
|
|
+ # 写入对应的redis
|
|
|
+ day_recall_result = {}
|
|
|
+ for video_id in filtered_videos:
|
|
|
+ score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
|
|
|
+ # print(score)
|
|
|
+ day_recall_result[int(video_id)] = float(score)
|
|
|
+ day_recall_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{rule_key}." \
|
|
|
+ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(day_recall_result) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=23 * 3600)
|
|
|
+ # 清空线上过滤应用列表
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
+
|
|
|
+
|
|
|
+def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
+ # 获取特征数据
|
|
|
+ feature_df = get_feature_data(project=project, table=table, now_date=now_date)
|
|
|
+ # rank
|
|
|
+ for key, value in rule_params.items():
|
|
|
+ log_.info(f"rule = {key}, param = {value}")
|
|
|
+ for region in region_code_list:
|
|
|
+ log_.info(f"region = {region}")
|
|
|
+ # 计算score
|
|
|
+ region_df = feature_df[feature_df['code'] == region]
|
|
|
+ log_.info(f'region_df count = {len(region_df)}')
|
|
|
+ score_df = cal_score(df=region_df)
|
|
|
+ video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
|
|
|
+ # to-csv
|
|
|
+ score_filename = f"score_24h_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
|
+ score_df.to_csv(f'./data/{score_filename}')
|
|
|
+ # to-logs
|
|
|
+ log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
|
|
|
+ "region_code": region,
|
|
|
+ "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
|
|
|
+ "rule_key": key,
|
|
|
+ "score_df": score_df[['videoid', 'score']]})
|
|
|
+
|
|
|
+
|
|
|
+def h_timer_check():
|
|
|
+ rule_params = config_.RULE_PARAMS_REGION_24H
|
|
|
+ project = config_.PROJECT_REGION_24H
|
|
|
+ table = config_.TABLE_REGION_24H
|
|
|
+ region_code_list = [code for region, code in region_code.items()]
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ now_h = datetime.datetime.now().hour
|
|
|
+ log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
+ # 查看当天更新的数据是否已准备好
|
|
|
+ h_data_count = data_check(project=project, table=table, now_date=now_date)
|
|
|
+ if h_data_count > 0:
|
|
|
+ log_.info(f'day_data_count = {h_data_count}')
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
|
|
|
+ project=project, table=table, region_code_list=region_code_list)
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, h_timer_check).start()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ h_timer_check()
|