import os.path import time import datetime import pandas as pd from odps import ODPS from utils import data_check, RedisHelper from threading import Timer from config import set_config config_, _ = set_config() redis_helper = RedisHelper() # ODPS服务配置 odps_config = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', } features = [ 'apptype', 'videoid', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] def get_feature_data(project, table, dt, app_type): """获取特征数据""" odps = ODPS( access_id=odps_config['ACCESSID'], secret_access_key=odps_config['ACCESSKEY'], project=project, endpoint=odps_config['ENDPOINT'], ) feature_data = [] sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 1000" with odps.execute_sql(sql).open_reader() as reader: for record in reader: # print(record) item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df def video_data_process(project, table, dt, app_type): """每日特征处理""" print('step 1: get video feature data') feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type) print(f"feature_initial_df shape: {feature_initial_df.shape}") print('step 2: process') feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int) feature_df = feature_initial_df.copy() # 缺失值填充 feature_df.fillna(0, inplace=True) # 数据类型校正 type_int_columns = [ 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', ] for column_name in type_int_columns: feature_df[column_name] = feature_df[column_name].astype(int) type_float_columns = [ 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] for column_name in type_float_columns: feature_df[column_name] = feature_df[column_name].astype(float) print(f"feature_df shape: {feature_df.shape}") print('step 3: add new video feature') # 补充新视频默认数据(使用均值) new_video_feature = { 'apptype': app_type, 'videoid': '-1', 'video_preview_count_uv_30day': int(feature_df['video_preview_count_uv_30day'].mean()), 'video_preview_count_pv_30day': int(feature_df['video_preview_count_pv_30day'].mean()), 'video_view_count_uv_30day': int(feature_df['video_view_count_uv_30day'].mean()), 'video_view_count_pv_30day': int(feature_df['video_view_count_pv_30day'].mean()), 'video_play_count_uv_30day': int(feature_df['video_play_count_uv_30day'].mean()), 'video_play_count_pv_30day': int(feature_df['video_play_count_pv_30day'].mean()), 'video_share_count_uv_30day': int(feature_df['video_share_count_uv_30day'].mean()), 'video_share_count_pv_30day': int(feature_df['video_share_count_pv_30day'].mean()), 'video_return_count_30day': int(feature_df['video_return_count_30day'].mean()), } new_video_feature['video_ctr_uv_30day'] = float( new_video_feature['video_play_count_uv_30day'] / new_video_feature['video_view_count_uv_30day'] + 1) new_video_feature['video_ctr_pv_30day'] = float( new_video_feature['video_play_count_pv_30day'] / new_video_feature['video_view_count_pv_30day'] + 1) new_video_feature['video_share_rate_uv_30day'] = float( new_video_feature['video_share_count_uv_30day'] / new_video_feature['video_play_count_uv_30day'] + 1) new_video_feature['video_share_rate_pv_30day'] = float( new_video_feature['video_share_count_pv_30day'] / new_video_feature['video_play_count_pv_30day'] + 1) new_video_feature['video_return_rate_30day'] = float( new_video_feature['video_return_count_30day'] / new_video_feature['video_view_count_pv_30day'] + 1) new_video_feature_df = pd.DataFrame([new_video_feature]) video_df = pd.concat([feature_df, new_video_feature_df]) print(f"video_df shape: {video_df.shape}") print(f"step 4: to csv") # 写入csv predict_data_dir = './data/predict_data' if not os.path.exists(predict_data_dir): os.makedirs(predict_data_dir) video_df.to_csv(f"{predict_data_dir}/video_feature.csv", index=False) # to redis xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb'] for ind, row in video_df.iterrows(): app_type = row['apptype'] video_id = row['videoid'] value = { 'video_preview_count_uv_30day': row['video_preview_count_uv_30day'], 'video_preview_count_pv_30day': row['video_preview_count_pv_30day'], 'video_view_count_uv_30day': row['video_view_count_uv_30day'], 'video_view_count_pv_30day': row['video_view_count_pv_30day'], 'video_play_count_uv_30day': row['video_play_count_uv_30day'], 'video_play_count_pv_30day': row['video_play_count_pv_30day'], 'video_share_count_uv_30day': row['video_share_count_uv_30day'], 'video_share_count_pv_30day': row['video_share_count_pv_30day'], 'video_return_count_30day': row['video_return_count_30day'], 'video_ctr_uv_30day': row['video_ctr_uv_30day'], 'video_ctr_pv_30day': row['video_ctr_pv_30day'], 'video_share_rate_uv_30day': row['video_share_rate_uv_30day'], 'video_share_rate_pv_30day': row['video_share_rate_pv_30day'], 'video_return_rate_30day': row['video_return_rate_30day'] } key = f"{xgb_config['predict_video_feature_key_prefix']}{app_type}:{video_id}" redis_helper.set_data_to_redis(key_name=key, value=str(value), expire_time=48*3600) def timer_check(): project = 'loghubods' table = 'admodel_testset_video' # dt = '20230725' now_date = datetime.datetime.today() dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d') # 查看当前更新的数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: print(f"ad predict video data count = {data_count}") # 数据准备好,进行更新 video_data_process(project=project, table=table, dt=dt, app_type=5) print(f"ad predict video data update end!") else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() if __name__ == '__main__': st_time = time.time() timer_check() print(f"{time.time() - st_time}s")