|
@@ -1,12 +1,40 @@
|
|
|
import datetime
|
|
|
import pandas as pd
|
|
|
-from utils import get_data_from_odps
|
|
|
+from odps import ODPS
|
|
|
+from utils import get_data_from_odps, RedisHelper, check_table_partition_exits
|
|
|
from config import set_config
|
|
|
from log import Log
|
|
|
config_, _ = set_config()
|
|
|
log_ = Log()
|
|
|
|
|
|
|
|
|
+def h_data_check(project, table, now_date):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ check_res = check_table_partition_exits(date=dt, project=project, table=table)
|
|
|
+ if check_res:
|
|
|
+ sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_count = reader.count
|
|
|
+ else:
|
|
|
+ data_count = 0
|
|
|
+ except Exception as e:
|
|
|
+ data_count = 0
|
|
|
+ return data_count
|
|
|
+
|
|
|
+
|
|
|
def get_feature_data(project, table, features, now_date):
|
|
|
"""获取特征数据"""
|
|
|
dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
@@ -32,7 +60,6 @@ def predict_user_group_share_rate(now_date):
|
|
|
'sharerate_all',
|
|
|
'sharerate_ad'
|
|
|
]
|
|
|
-
|
|
|
user_group_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
|
|
|
user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
|
|
|
user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
|
|
@@ -42,6 +69,14 @@ def predict_user_group_share_rate(now_date):
|
|
|
# 计算用户组有广告时分享率
|
|
|
user_group_df['group_ad_share_rate'] = \
|
|
|
user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
|
|
|
+ # 结果写入redis
|
|
|
+ # key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
|
|
|
+ # redis_data = {}
|
|
|
+ # for item in user_group_df:
|
|
|
+ # redis_data[item['group']] = item['group_ad_share_rate']
|
|
|
+ # if len(redis_data) > 0:
|
|
|
+ # redis_helper = RedisHelper()
|
|
|
+ # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
|
|
|
return user_group_df
|
|
|
|
|
|
|
|
@@ -56,7 +91,6 @@ def predict_video_share_rate(now_date):
|
|
|
'sharerate_all',
|
|
|
'sharerate_ad'
|
|
|
]
|
|
|
-
|
|
|
video_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
|
|
|
video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
|
|
|
video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
|
|
@@ -66,6 +100,27 @@ def predict_video_share_rate(now_date):
|
|
|
# 计算视频有广告时分享率
|
|
|
video_df['video_ad_share_rate'] = \
|
|
|
video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
|
|
|
+ # 结果写入redis
|
|
|
+ # key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
|
|
|
+ # redis_data = {}
|
|
|
+ # for item in video_df:
|
|
|
+ # redis_data[item['videoid']] = item['video_ad_share_rate']
|
|
|
+ # if len(redis_data) > 0:
|
|
|
+ # redis_helper = RedisHelper()
|
|
|
+ # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
|
|
|
return video_df
|
|
|
|
|
|
|
|
|
+def predict_ad_group_video(now_date):
|
|
|
+ user_group_df = predict_user_group_share_rate(now_date)
|
|
|
+ video_df = predict_video_share_rate(now_date)
|
|
|
+ predict_df = video_df
|
|
|
+ for item in user_group_df:
|
|
|
+ predict_df[item['group']] = predict_df['videoid'] * item['group_ad_share_rate']
|
|
|
+ return predict_df
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ predict_df = predict_ad_group_video(now_date)
|
|
|
+
|