import datetime import pandas as pd from odps import ODPS from utils import get_data_from_odps, RedisHelper, check_table_partition_exits from config import set_config from log import Log config_, _ = set_config() log_ = Log() def h_data_check(project, table, now_date): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: dt = datetime.datetime.strftime(now_date, '%Y%m%d') check_res = check_table_partition_exits(date=dt, project=project, table=table) if check_res: sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 except Exception as e: data_count = 0 return data_count def get_feature_data(project, table, features, now_date): """获取特征数据""" dt = datetime.datetime.strftime(now_date, '%Y%m%d') records = get_data_from_odps(date=dt, project=project, table=table) feature_data = [] for record in records: item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df def predict_user_group_share_rate(now_date): """预估用户组对应的有广告时分享率""" # 获取用户组特征 project = config_.ad_model_data['users_share_rate'].get('project') table = config_.ad_model_data['users_share_rate'].get('table') features = [ 'apptype', 'group', 'sharerate_all', 'sharerate_ad' ] user_group_df = get_feature_data(project=project, table=table, features=features, now_date=now_date) user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float) user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float) # 获取有广告时所有用户组近30天的分享率 ad_all_group_share_rate = user_group_df[user_group_df['group'] == 'allmids']['sharerate_ad'] user_group_df = user_group_df[user_group_df['group'] != 'allmids'] # 计算用户组有广告时分享率 user_group_df['group_ad_share_rate'] = \ user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all'] # 结果写入redis # key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{datetime.datetime.strftime(now_date, '%Y%m%d')}" # redis_data = {} # for item in user_group_df: # redis_data[item['group']] = item['group_ad_share_rate'] # if len(redis_data) > 0: # redis_helper = RedisHelper() # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600) return user_group_df def predict_video_share_rate(now_date): """预估视频有广告时分享率""" # 获取视频特征 project = config_.ad_model_data['videos_share_rate'].get('project') table = config_.ad_model_data['videos_share_rate'].get('table') features = [ 'apptype', 'videoid', 'sharerate_all', 'sharerate_ad' ] video_df = get_feature_data(project=project, table=table, features=features, now_date=now_date) video_df['sharerate_all'] = video_df['sharerate_all'].astype(float) video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float) # 获取有广告时所有视频近30天的分享率 ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad'] video_df = video_df[video_df['videoid'] != 'allvideos'] # 计算视频有广告时分享率 video_df['video_ad_share_rate'] = \ video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all'] # 结果写入redis # key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{datetime.datetime.strftime(now_date, '%Y%m%d')}" # redis_data = {} # for item in video_df: # redis_data[item['videoid']] = item['video_ad_share_rate'] # if len(redis_data) > 0: # redis_helper = RedisHelper() # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600) return video_df def predict_ad_group_video(now_date): user_group_df = predict_user_group_share_rate(now_date) video_df = predict_video_share_rate(now_date) predict_df = video_df for item in user_group_df: predict_df[item['group']] = predict_df['videoid'] * item['group_ad_share_rate'] return predict_df if __name__ == '__main__': now_date = datetime.datetime.today() predict_df = predict_ad_group_video(now_date)