Browse Source

feat:添加数据下载脚本

zhaohaipeng 8 months ago
parent
commit
cad7628d39
6 changed files with 425 additions and 1 deletions
  1. 2 1
      .gitignore
  2. 13 0
      client/ODPSClient.py
  3. 0 0
      vov/__init__.py
  4. 67 0
      vov/data_download.py
  5. 151 0
      vov/vov_h0_train.py
  6. 192 0
      vov/vov_h24_analyse.py

+ 2 - 1
.gitignore

@@ -63,4 +63,5 @@ target/
 XGB/new*
 XGB/data
 XGB/file
-logs
+logs
+.DS_Store

+ 13 - 0
client/ODPSClient.py

@@ -4,6 +4,7 @@ import re
 import requests
 from odps import ODPS
 from odps.tunnel import TableTunnel
+
 from util import convert_util, date_util
 
 
@@ -20,6 +21,18 @@ class ODPSClient(object):
             self.endpoint
         )
 
+    def execute_sql(self, sql: str):
+        hints = {
+            'odps.sql.submit.mode': 'script'
+        }
+        with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
+            pd_df = reader.to_pandas()
+        return pd_df
+
+    def execute_sql_result_save_file(self, sql: str, filepath: str):
+        result = self.execute_sql(sql)
+        result.to_csv(filepath, index=False)
+
     def get_all_record(self, table: str, dt: str) -> list:
         tunnel = TableTunnel(self.odps)
 

+ 0 - 0
vov/__init__.py


+ 67 - 0
vov/data_download.py

@@ -0,0 +1,67 @@
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+import pandas as pd
+
+from client import ODPSClient
+
+odps_client = ODPSClient.ODPSClient()
+
+dt_list = ['20241028', "20241027", "20241026"]
+hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
+           "18", "19", "20", "21", "22", "23"]
+
+VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
+
+
+# 定义查询函数
+def read_odps(dt: str, hh: str):
+    # 读取SQL文件
+    sql = ""
+    with open(f"{VOV_BASE_PATH}/sql/vov排序_552_562.sql", "r") as f:
+        sql = f.read()
+
+    real_sql = (sql.replace("${bizdate}", dt)
+                .replace("${hh}", hh))
+    print(f"Executing for dt: {dt}, hh: {hh}")
+    odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
+
+
+# 并行执行函数
+def run_parallel():
+    # 定义线程池的最大线程数
+    with ThreadPoolExecutor(max_workers=24) as executor:
+        # 创建任务列表
+        future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
+                          for dt in dt_list
+                          for hh in hh_list
+                          }
+
+        # 监控每个任务的完成情况
+        for future in as_completed(future_to_task):
+            dt, hh = future_to_task[future]
+            try:
+                future.result()  # 获取执行结果
+                print(f"Completed for dt: {dt}, hh: {hh}")
+            except Exception as exc:
+                print(f"Error for dt: {dt}, hh: {hh} - {exc}")
+
+
+def download():
+    # 执行并行任务
+    run_parallel()
+    for dt in dt_list:
+        csv_list = []
+        for hh in hh_list:
+            csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
+
+        df_list = [pd.read_csv(file) for file in csv_list]
+        df = pd.concat(df_list, ignore_index=True)
+        df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
+
+
+def _main():
+    download()
+
+
+if __name__ == "__main__":
+    _main()

+ 151 - 0
vov/vov_h0_train.py

@@ -0,0 +1,151 @@
+import numpy as np
+import pandas as pd
+from scipy.optimize import minimize
+from sklearn.metrics import r2_score
+from sklearn.model_selection import train_test_split
+import pickle
+
+
+# 1. 加载数据
+def load_data(file_path):
+    df = pd.read_csv(file_path, na_values='\\N')
+    return df
+
+# 2. 数据预处理
+def preprocess_data(df, features, target, exposure_col, top_k):
+    # 按曝光量排序并选择 Top k 数据
+    df_sorted = df.sort_values(by=exposure_col, ascending=False)
+    df_topk = df_sorted.head(top_k)
+
+    X = df_topk[features]
+    y = df_topk[target]
+
+    # 获取 Top K 对应的曝光阈值
+    exposure_threshold = df_topk[exposure_col].min()
+
+    return X, y, exposure_threshold,df_topk
+
+# 3. 计算相关系数
+def calculate_correlations(df, features, target):
+    correlations = {}
+    for feature in features:
+        # 删除 target 或 feature 列中任一为空的行
+        valid_data = df[[target, feature]].dropna()
+
+        # 如果没有有效数据,相关系数设为 0
+        if len(valid_data) == 0:
+            correlations[feature] = 0
+        else:
+            # 计算相关系数
+            corr = valid_data[target].corr(valid_data[feature])
+            correlations[feature] = corr if not np.isnan(corr) else 0
+
+    # 转换为 Series 并按绝对值大小排序
+    corr_series = pd.Series(correlations).abs().sort_values(ascending=False)
+    return corr_series
+
+
+# 4. 定义动态加权和函数
+def dynamic_weighted_sum(features, weights):
+    valid_features = ~np.isnan(features)
+    if np.sum(valid_features) == 0:
+        return np.nan
+    normalized_weights = weights[valid_features] / np.sum(weights[valid_features])
+    return np.sum(features[valid_features] * normalized_weights)
+
+# 5. 定义损失函数
+def mse_loss(y_true, y_pred):
+    valid = ~np.isnan(y_true) & ~np.isnan(y_pred)
+    return np.mean((y_true[valid] - y_pred[valid])**2)
+
+# 6. 定义目标函数
+def objective(weights, X, y_true):
+    y_pred = np.array([dynamic_weighted_sum(x, weights) for x in X.values])
+    return mse_loss(y_true, y_pred)
+
+# 7. 搜索最佳权重
+def find_best_weights(X, y, initial_weights):
+    result = minimize(objective, initial_weights, args=(X, y), method='Nelder-Mead')
+    return result.x
+
+# 8. 评估模型
+def evaluate_model(X, y, weights):
+    y_pred = np.array([dynamic_weighted_sum(x, weights) for x in X.values])
+    valid = ~np.isnan(y) & ~np.isnan(y_pred)
+    r2 = r2_score(y[valid], y_pred[valid])
+    mse = mse_loss(y, y_pred)
+    return r2, mse
+
+# 9. 保存模型
+def save_model(weights, features, exposure_threshold,top_k, file_path):
+    model = {
+        'weights': weights,
+        'features': features,
+        'exposure_threshold': exposure_threshold,
+        'top_k':top_k
+    }
+    with open(file_path, 'wb') as f:
+        pickle.dump(model, f)
+
+# 10. 加载模型
+def load_model(file_path):
+    with open(file_path, 'rb') as f:
+        model = pickle.load(f)
+    return model['weights'], model['features'], model['exposure_threshold'],model['top_k']
+
+
+
+# 12. 主函数
+def main():
+    # 加载数据
+    df = load_data('train_20240921.csv')
+
+    # 定义特征、目标变量和曝光量列
+    features = ['h1_ago_vov', 'h2_ago_vov', 'h3_ago_vov', 'h24_ago_vov', 'h48_ago_vov', 'd1_ago_vov', 'd2_ago_vov']
+    target = 'cur_hour_vov'
+    exposure_col = 'h1_ago_view'  # 请确保你的数据中有这个列
+    top_k = 1000  # 设置你想要使用的 Top k 数据点数量
+
+    # 预处理数据
+    X, y, exposure_threshold,df_topk = preprocess_data(df, features, target, exposure_col, top_k)
+
+    # 划分训练集和测试集
+    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
+
+    # 计算相关系数
+    correlations = calculate_correlations(df_topk, features, target)
+    print("Feature correlations:")
+    print(correlations)
+
+    # 使用相关系数作为初始权重
+    initial_weights = correlations[features].values
+
+    # 搜索最佳权重
+    best_weights = find_best_weights(X_train, y_train, initial_weights)
+
+    # 评估模型
+    r2_train, mse_train = evaluate_model(X_train, y_train, best_weights)
+    r2_test, mse_test = evaluate_model(X_test, y_test, best_weights)
+
+    print(f"\nTrain R² Score: {r2_train:.4f}, MSE: {mse_train:.4f}")
+    print(f"Test R² Score: {r2_test:.4f}, MSE: {mse_test:.4f}")
+
+    # 输出特征重要性
+    print("\nFeature importance:")
+    for feature, weight in zip(features, best_weights):
+        print(f"{feature}: {weight:.4f}")
+
+    # 保存模型
+    save_model(pd.Series(best_weights, index=features), features, exposure_threshold,top_k, 'top'+str(top_k)+'_linear_weighted_model.pkl')
+
+    # 测试加载模型
+    loaded_weights, loaded_features, loaded_threshold,topk = load_model('top'+str(top_k)+'_linear_weighted_model.pkl')
+    print("\nLoaded model weights:")
+    for feature, weight in loaded_weights.items():
+        print(f"{feature}: {weight:.4f}")
+    print(f"Exposure threshold: {loaded_threshold}")
+    print(f"TopK: {topk}")
+
+
+if __name__ == "__main__":
+    main()

+ 192 - 0
vov/vov_h24_analyse.py

@@ -0,0 +1,192 @@
+import numpy as np
+import pandas as pd
+from scipy.optimize import minimize
+from sklearn.metrics import r2_score
+from sklearn.model_selection import train_test_split
+import pickle
+
+all_feature_names = ["1_vovh0", "2_vovh0", "2_vovh1", "3_vovh0", "3_vovh1", "3_vovh2", "4_vovh0", "4_vovh1", "4_vovh2",
+                     "4_vovh3", "6_vovh0", "6_vovh1", "6_vovh6", "12_vovh0", "12_vovh1", "12_vovh12", "24_vovh0",
+                     "24_vovh1", "24_vovh2", "24_vovh3", "24_vovh6", "24_vovh12", "24_vovh24", "48_vovh0", "48_vovh1",
+                     "48_vovh2", "48_vovh3", "48_vovh6", "48_vovh12", "48_vovh24", "48_vovh48", "1_vovd0", "2_vovd0",
+                     "3_vovd0", "4_vovd0", "5_vovd0", "2_vovd1", "3_vovd1", "4_vovd1", "5_vovd1", "3_vovd2", "4_vovd2",
+                     "5_vovd2", "1_vovh_分母", "1_vovh0分子", "2_vovh_分母", "2_vovh0分子", "2_vovh1分子",
+                     "3_vovh_分母", "3_vovh0分子", "3_vovh1分子", "3_vovh2分子", "4_vovh_分母", "4_vovh0分子",
+                     "4_vovh1分子", "4_vovh2分子", "4_vovh3分子", "6_vovh_分母", "6_vovh0分子", "6_vovh1分子",
+                     "6_vovh6分子", "12_vovh_分母", "12_vovh0分子", "12_vovh1分子", "12_vovh12分子", "24_vovh_分母",
+                     "24_vovh0分子", "24_vovh1分子", "24_vovh2分子", "24_vovh3分子", "24_vovh6分子", "24_vovh12分子",
+                     "24_vovh24分子", "48_vovh_分母", "48_vovh0分子", "48_vovh1分子", "48_vovh2分子", "48_vovh3分子",
+                     "48_vovh6分子", "48_vovh12分子", "48_vovh24分子", "48_vovh48分子", "1_vovd0_分母", "1_vovd0_分子",
+                     "2_vovd0_分母", "2_vovd0_分子", "3_vovd0_分母", "3_vovd0_分子", "4_vovd0_分母", "4_vovd0_分子",
+                     "5_vovd0_分母", "5_vovd0_分子", "2_vovd1_分母", "2_vovd1_分子", "3_vovd1_分母", "3_vovd1_分子",
+                     "4_vovd1_分母", "4_vovd1_分子", "5_vovd1_分母", "5_vovd1_分子", "3_vovd2_分母", "3_vovd2_分子",
+                     "4_vovd2_分母", "4_vovd2_分子", "5_vovd2_分母", "5_vovd2_分子"]
+
+# feature_names = ["1_vovh0",
+#                  "2_vovh0", "2_vovh1",
+#                  "3_vovh0", "3_vovh1", "3_vovh2",
+#                  "4_vovh0", "4_vovh1", "4_vovh2", "4_vovh3",
+#                  "6_vovh0", "6_vovh1", "6_vovh6",
+#                  "12_vovh0", "12_vovh1", "12_vovh12",
+#                  "24_vovh0", "24_vovh1", "24_vovh2", "24_vovh3", "24_vovh6", "24_vovh12", "24_vovh24",
+#                  "48_vovh0", "48_vovh1", "48_vovh2", "48_vovh3", "48_vovh6", "48_vovh12", "48_vovh24", "48_vovh48",
+#                  "1_vovd0", "2_vovd0", "3_vovd0",
+#                  "2_vovd1", "3_vovd1"
+#                  ]
+
+feature_names = ["1_vovh0",
+                 "2_vovh0", "2_vovh1",
+                 "3_vovh1", "3_vovh2",
+                 "4_vovh1", "4_vovh3",
+                 "6_vovh1", "6_vovh6",
+                 "12_vovh1", "12_vovh12",
+                 "24_vovh1", "24_vovh2", "24_vovh3", "24_vovh6", "24_vovh12", "24_vovh24",
+                 "48_vovh1", "48_vovh2", "48_vovh3", "48_vovh6", "48_vovh12", "48_vovh24", "48_vovh48",
+                 "1_vovd0",
+                 "2_vovd1", "3_vovd1"
+                 ]
+
+dt_list = ['20241014', '20241015', '20241016']
+hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12",
+           "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23"]
+
+pd.set_option('display.max_rows', None)  # 显示所有行
+pd.set_option('display.max_columns', None)  # 显示所有列
+
+
+# 1. 加载数据
+def load_data(filepath: str) -> pd.DataFrame:
+    return pd.read_csv(filepath)
+
+
+# 2. 数据预处理
+def preprocess_data(df, features, target):
+    df_sorted = df.sort_values(by=target, ascending=False)
+    x = df_sorted[features]
+    y = df_sorted[target]
+
+    top_k = df_sorted.head(100)
+
+    return x, y, top_k
+
+
+# 3. 计算相关系数
+def calculate_correlations(df, features, target):
+    correlations = {}
+    for feature in features:
+        # 删除 target 或 feature 列中任一为空的行
+        valid_data = df[[target, feature]].dropna()
+
+        # 如果没有有效数据,相关系数设为 0
+        if len(valid_data) == 0:
+            correlations[feature] = 0
+        else:
+            # 计算相关系数
+            corr = valid_data[target].corr(valid_data[feature])
+            correlations[feature] = corr if not np.isnan(corr) else 0
+
+    # 转换为 Series 并按绝对值大小排序
+    corr_series = pd.Series(correlations).abs().sort_values(ascending=False)
+    return corr_series
+
+
+# 4. 定义动态加权和函数
+def dynamic_weighted_sum(features, weights):
+    valid_features = ~np.isnan(features)
+    if np.sum(valid_features) == 0:
+        return np.nan
+    normalized_weights = weights[valid_features] / np.sum(weights[valid_features])
+    return np.sum(features[valid_features] * normalized_weights)
+
+
+# 5. 定义损失函数
+def mse_loss(y_true, y_pred):
+    valid = ~np.isnan(y_true) & ~np.isnan(y_pred)
+    return np.mean((y_true[valid] - y_pred[valid]) ** 2)
+
+
+# 6. 定义目标函数
+def objective(weights, X, y_true):
+    y_pred = np.array([dynamic_weighted_sum(x, weights) for x in X.values])
+    return mse_loss(y_true, y_pred)
+
+
+# 7. 搜索最佳权重
+def find_best_weights(X, y, initial_weights):
+    result = minimize(objective, initial_weights, args=(X, y), method='Nelder-Mead')
+    return result.x
+
+
+# 8. 评估模型
+def evaluate_model(X, y, weights):
+    y_pred = np.array([dynamic_weighted_sum(x, weights) for x in X.values])
+    valid = ~np.isnan(y) & ~np.isnan(y_pred)
+    r2 = r2_score(y[valid], y_pred[valid])
+    mse = mse_loss(y, y_pred)
+    return r2, mse
+
+
+# 9. 保存模型
+def save_model(weights, features, file_path):
+    model = {
+        'weights': weights,
+        'features': features,
+    }
+    with open(file_path, 'wb') as f:
+        pickle.dump(model, f)
+
+
+# 10. 加载模型
+def load_model(file_path):
+    with open(file_path, 'rb') as f:
+        model = pickle.load(f)
+    return model['weights'], model['features']
+
+
+def single_dt_handle(dt, df: pd.DataFrame):
+    x, y, top_key = preprocess_data(df, feature_names, "vovh24")
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
+    correl = calculate_correlations(top_key, feature_names, "vovh24")
+    print(f"{dt}   Feature Correlations: ")
+    print(correl.head(5))
+
+    initial_weights = correl[feature_names].values
+    best_weights = find_best_weights(x_train, y_train, initial_weights)
+    # 评估模型
+    r2_train, mse_train = evaluate_model(x_train, y_train, best_weights)
+    r2_test, mse_test = evaluate_model(x_test, y_test, best_weights)
+
+    print(f"\nTrain R² Score: {r2_train:.4f}, MSE: {mse_train:.4f}")
+    print(f"Test R² Score: {r2_test:.4f}, MSE: {mse_test:.4f}")
+
+    # 输出特征重要性
+    print("\nFeature importance:")
+    for feature, weight in zip(feature_names, best_weights):
+        print(f"{feature}: {weight:.4f}")
+
+    # 保存模型
+    save_model(pd.Series(best_weights, index=feature_names), feature_names,
+               '/Users/zhao/Desktop/vov/model/vovh24_linear_weighted_model.pkl')
+
+    # 测试加载模型
+    loaded_weights, loaded_features = load_model('/Users/zhao/Desktop/vov/model/vovh24_linear_weighted_model.pkl')
+    print("\nLoaded model weights:")
+    for feature, weight in loaded_weights.items():
+        print(f"{feature}: {weight:.4f}")
+
+
+def _main():
+    df_dict = {}
+    for dt in dt_list:
+        for hh in hh_list:
+            key = f"{dt}{hh}"
+            df = load_data(f"/Users/zhao/Desktop/vov/{key}.csv")
+            df_dict[key] = df
+
+    for key in df_dict:
+        single_dt_handle(key, df_dict[key])
+        return
+
+
+if __name__ == '__main__':
+    _main()