liqian 1 year ago
parent
commit
035cdd3d00
1 changed files with 50 additions and 12 deletions
  1. 50 12
      ad_feature_process.py

+ 50 - 12
ad_feature_process.py

@@ -1,5 +1,13 @@
+import os.path
+import time
+
 import pandas as pd
 from utils import get_data_from_odps
+from odps.df import DataFrame
+from odps import ODPS
+from config import set_config
+
+config_, env = set_config()
 
 features = [
     'apptype',
@@ -66,15 +74,37 @@ train_feature = [
 
 def get_feature_data(project, table, features, dt):
     """获取特征数据"""
-    records = get_data_from_odps(date=dt, project=project, table=table)
+    # records = get_data_from_odps(date=dt, project=project, table=table)
+    # feature_data = []
+    # i = 0
+    # for record in records:
+    #     if i > 300000:
+    #         break
+    #     item = {}
+    #     for feature_name in features:
+    #         item[feature_name] = record[feature_name]
+    #     feature_data.append(item)
+    #     i += 1
+    # feature_df = pd.DataFrame(feature_data)
+    # return feature_df
+
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+    )
     feature_data = []
-    for record in records:
-        item = {}
-        for feature_name in features:
-            item[feature_name] = record[feature_name]
-        feature_data.append(item)
-    feature_df = pd.DataFrame(feature_data)
-    return feature_df
+    sql = f"select * from {project}.{table} where dt={dt} and apptype=0"
+    with odps.execute_sql(sql).open_reader() as reader:
+        for record in reader:
+            # print(record)
+            item = {}
+            for feature_name in features:
+                item[feature_name] = record[feature_name]
+            feature_data.append(item)
+        feature_df = pd.DataFrame(feature_data)
+        return feature_df
 
 
 def daily_data_process(project, table, features, dt, app_type):
@@ -84,14 +114,14 @@ def daily_data_process(project, table, features, dt, app_type):
     print(f"feature_initial_df shape: {feature_initial_df.shape}")
     print('step 2: process')
     feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
-    feature_df = feature_initial_df[feature_initial_df['apptype'] == app_type]
+    feature_df = feature_initial_df[feature_initial_df['apptype'] == app_type].copy()
     # 增加此次是否有广告字段 'ad_status' 1: 有广告, 0: 无广告
-    feature_df['ad_status'] = feature_df.apply(func=lambda x: 1 if x['ad_mid'] == x['mid'] else 0)
+    feature_df['ad_status'] = feature_df.apply(func=lambda x: 1 if x['ad_mid'] == x['mid'] else 0, axis=1)
     feature_df['share_videoid'].fillna(0, inplace=True)
     feature_df['share_videoid'] = feature_df['share_videoid'].astype(int)
     feature_df['videoid'] = feature_df['videoid'].astype(int)
     # 增加此次是否分享了该视频 'share_status' 1: 分享, 0: 为分享
-    feature_df['share_status'] = feature_df.apply(func=lambda x: 1 if x['share_videoid'] == x['videoid'] else 0)
+    feature_df['share_status'] = feature_df.apply(func=lambda x: 1 if x['share_videoid'] == x['videoid'] else 0, axis=1)
     # 缺失值填充
     feature_df.fillna(0, inplace=True)
     # 数据类型校正
@@ -132,17 +162,25 @@ def daily_data_process(project, table, features, dt, app_type):
     print('step 3: get train_df')
     train_df = feature_df[train_feature]
     print(f"train_df shape: {train_df.shape}")
+    # 写入csv
+    train_data_dir = './data/train_data'
+    if not os.path.exists(train_data_dir):
+        os.makedirs(train_data_dir)
+    train_df.to_csv(f"{train_data_dir}/{dt}.csv")
     return train_df
 
 
 if __name__ == '__main__':
+    st_time = time.time()
     project = 'loghubods'
     table = 'admodel_data_train'
     dt = '20230725'
     df = daily_data_process(project=project, table=table, features=features, dt=dt, app_type=0)
     print(df.shape)
     print(df.columns)
-    df.to_csv(f'./data/{dt}.csv')
+    # df.to_csv(f'./data/{dt}.csv', index=False)
+    # get_feature_data(project=project, table=table, features=features, dt=dt)
+    print(time.time() - st_time)