|
@@ -0,0 +1,118 @@
|
|
|
+import os.path
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+import pandas as pd
|
|
|
+from odps import ODPS
|
|
|
+
|
|
|
+# ODPS服务配置
|
|
|
+odps_config = {
|
|
|
+ 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
|
|
|
+ 'ACCESSID': 'LTAIWYUujJAm7CbH',
|
|
|
+ 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
|
|
|
+}
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'apptype',
|
|
|
+ 'mid',
|
|
|
+ 'mid_preview_count_30day',
|
|
|
+ 'mid_view_count_30day',
|
|
|
+ 'mid_view_count_pv_30day',
|
|
|
+ 'mid_play_count_30day',
|
|
|
+ 'mid_play_count_pv_30day',
|
|
|
+ 'mid_share_count_30day',
|
|
|
+ 'mid_share_count_pv_30day',
|
|
|
+ 'mid_return_count_30day',
|
|
|
+ 'mid_share_rate_30day',
|
|
|
+ 'mid_return_rate_30day',
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+def get_feature_data(project, table, dt, app_type):
|
|
|
+ """获取特征数据"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=odps_config['ACCESSID'],
|
|
|
+ secret_access_key=odps_config['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=odps_config['ENDPOINT'],
|
|
|
+ )
|
|
|
+ feature_data = []
|
|
|
+ sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type}"
|
|
|
+ with odps.execute_sql(sql).open_reader() as reader:
|
|
|
+ for record in reader:
|
|
|
+ # print(record)
|
|
|
+ item = {}
|
|
|
+ for feature_name in features:
|
|
|
+ item[feature_name] = record[feature_name]
|
|
|
+ feature_data.append(item)
|
|
|
+ feature_df = pd.DataFrame(feature_data)
|
|
|
+ return feature_df
|
|
|
+
|
|
|
+
|
|
|
+def user_data_process(project, table, dt, app_type):
|
|
|
+ """每日特征处理"""
|
|
|
+ print('step 1: get user feature data')
|
|
|
+ feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
|
|
|
+ print(f"feature_initial_df shape: {feature_initial_df.shape}")
|
|
|
+ print('step 2: process')
|
|
|
+ feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
|
|
|
+ feature_df = feature_initial_df.copy()
|
|
|
+ # 缺失值填充
|
|
|
+ feature_df.fillna(0, inplace=True)
|
|
|
+ # 数据类型校正
|
|
|
+ type_int_columns = [
|
|
|
+ 'mid_preview_count_30day',
|
|
|
+ 'mid_view_count_30day',
|
|
|
+ 'mid_view_count_pv_30day',
|
|
|
+ 'mid_play_count_30day',
|
|
|
+ 'mid_play_count_pv_30day',
|
|
|
+ 'mid_share_count_30day',
|
|
|
+ 'mid_share_count_pv_30day',
|
|
|
+ 'mid_return_count_30day',
|
|
|
+ ]
|
|
|
+ for column_name in type_int_columns:
|
|
|
+ feature_df[column_name].astype(int)
|
|
|
+ type_float_columns = [
|
|
|
+ 'mid_share_rate_30day',
|
|
|
+ 'mid_return_rate_30day',
|
|
|
+ ]
|
|
|
+ for column_name in type_float_columns:
|
|
|
+ feature_df[column_name].astype(float)
|
|
|
+ print(f"feature_df shape: {feature_df.shape}")
|
|
|
+ print('step 3: add new user feature')
|
|
|
+ # 补充新用户默认数据(使用均值)
|
|
|
+ new_user_feature = {
|
|
|
+ 'apptype': app_type,
|
|
|
+ 'mid': '-1',
|
|
|
+ 'mid_preview_count_30day': int(feature_df['mid_preview_count_30day'].mean()),
|
|
|
+ 'mid_view_count_30day': int(feature_df['mid_view_count_30day'].mean()),
|
|
|
+ 'mid_view_count_pv_30day': int(feature_df['mid_view_count_pv_30day'].mean()),
|
|
|
+ 'mid_play_count_30day': int(feature_df['mid_play_count_30day'].mean()),
|
|
|
+ 'mid_play_count_pv_30day': int(feature_df['mid_play_count_pv_30day'].mean()),
|
|
|
+ 'mid_share_count_30day': int(feature_df['mid_share_count_30day'].mean()),
|
|
|
+ 'mid_share_count_pv_30day': int(feature_df['mid_share_count_pv_30day'].mean()),
|
|
|
+ 'mid_return_count_30day': int(feature_df['mid_return_count_30day'].mean()),
|
|
|
+ }
|
|
|
+ new_user_feature['mid_share_rate_30day'] = float(
|
|
|
+ new_user_feature['mid_share_count_pv_30day'] / new_user_feature['mid_play_count_pv_30day'])
|
|
|
+ new_user_feature['mid_return_rate_30day'] = float(
|
|
|
+ new_user_feature['mid_return_count_30day'] / new_user_feature['mid_view_count_pv_30day'])
|
|
|
+ new_user_feature_df = pd.DataFrame([new_user_feature])
|
|
|
+ user_df = pd.concat([feature_df, new_user_feature_df])
|
|
|
+ print(f"user_df shape: {user_df.shape}")
|
|
|
+ print(f"step 4: to csv")
|
|
|
+ # 写入csv
|
|
|
+ predict_data_dir = './data/predict_data'
|
|
|
+ if not os.path.exists(predict_data_dir):
|
|
|
+ os.makedirs(predict_data_dir)
|
|
|
+ user_df.to_csv(f"{predict_data_dir}/user_feature.csv", index=False)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ st_time = time.time()
|
|
|
+ project = 'loghubods'
|
|
|
+ table = 'admodel_testset_mid'
|
|
|
+ # dt = '20230725'
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
|
+ user_data_process(project=project, table=table, dt=dt, app_type=0)
|
|
|
+ print(time.time() - st_time)
|