import os.path import time import pandas as pd from utils import get_data_from_odps from odps.df import DataFrame from odps import ODPS from config import set_config config_, env = set_config() features = [ 'apptype', 'subsessionid', 'mid', 'videoid', 'ad_mid', 'share_videoid', 'mid_preview_count_30day', 'mid_view_count_30day', 'mid_view_count_pv_30day', 'mid_play_count_30day', 'mid_play_count_pv_30day', 'mid_share_count_30day', 'mid_share_count_pv_30day', 'mid_return_count_30day', 'mid_share_rate_30day', 'mid_return_rate_30day', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] train_feature = [ 'mid_preview_count_30day', 'mid_view_count_30day', 'mid_view_count_pv_30day', 'mid_play_count_30day', 'mid_play_count_pv_30day', 'mid_share_count_30day', 'mid_share_count_pv_30day', 'mid_return_count_30day', 'mid_share_rate_30day', 'mid_return_rate_30day', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', 'ad_status', 'share_status', ] def get_feature_data(project, table, features, dt): """获取特征数据""" # records = get_data_from_odps(date=dt, project=project, table=table) # feature_data = [] # i = 0 # for record in records: # if i > 300000: # break # item = {} # for feature_name in features: # item[feature_name] = record[feature_name] # feature_data.append(item) # i += 1 # feature_df = pd.DataFrame(feature_data) # return feature_df odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], ) feature_data = [] sql = f"select * from {project}.{table} where dt={dt} and apptype=0" with odps.execute_sql(sql).open_reader() as reader: for record in reader: # print(record) item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df def daily_data_process(project, table, features, dt, app_type): """每日特征处理""" print('step 1: get feature data') feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt) print(f"feature_initial_df shape: {feature_initial_df.shape}") print('step 2: process') feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int) feature_df = feature_initial_df[feature_initial_df['apptype'] == app_type].copy() # 增加此次是否有广告字段 'ad_status' 1: 有广告, 0: 无广告 feature_df['ad_status'] = feature_df.apply(func=lambda x: 1 if x['ad_mid'] == x['mid'] else 0, axis=1) feature_df['share_videoid'].fillna(0, inplace=True) feature_df['share_videoid'] = feature_df['share_videoid'].astype(int) feature_df['videoid'] = feature_df['videoid'].astype(int) # 增加此次是否分享了该视频 'share_status' 1: 分享, 0: 为分享 feature_df['share_status'] = feature_df.apply(func=lambda x: 1 if x['share_videoid'] == x['videoid'] else 0, axis=1) # 缺失值填充 feature_df.fillna(0, inplace=True) # 数据类型校正 type_int_columns = [ 'mid_preview_count_30day', 'mid_view_count_30day', 'mid_view_count_pv_30day', 'mid_play_count_30day', 'mid_play_count_pv_30day', 'mid_share_count_30day', 'mid_share_count_pv_30day', 'mid_return_count_30day', 'video_preview_count_uv_30day', 'video_preview_count_pv_30day', 'video_view_count_uv_30day', 'video_view_count_pv_30day', 'video_play_count_uv_30day', 'video_play_count_pv_30day', 'video_share_count_uv_30day', 'video_share_count_pv_30day', 'video_return_count_30day', ] for column_name in type_int_columns: feature_df[column_name].astype(int) type_float_columns = [ 'mid_share_rate_30day', 'mid_return_rate_30day', 'video_ctr_uv_30day', 'video_ctr_pv_30day', 'video_share_rate_uv_30day', 'video_share_rate_pv_30day', 'video_return_rate_30day', ] for column_name in type_float_columns: feature_df[column_name].astype(float) print(f"feature_df shape: {feature_df.shape}") # 获取所需的字段 print('step 3: get train_df') train_df = feature_df[train_feature] print(f"train_df shape: {train_df.shape}") # 写入csv train_data_dir = './data/train_data' if not os.path.exists(train_data_dir): os.makedirs(train_data_dir) train_df.to_csv(f"{train_data_dir}/{dt}.csv") return train_df if __name__ == '__main__': st_time = time.time() project = 'loghubods' table = 'admodel_data_train' dt = '20230725' df = daily_data_process(project=project, table=table, features=features, dt=dt, app_type=0) print(df.shape) print(df.columns) # df.to_csv(f'./data/{dt}.csv', index=False) # get_feature_data(project=project, table=table, features=features, dt=dt) print(time.time() - st_time)