Jelajahi Sumber

feat:添加模型权重分析脚本

zhaohaipeng 7 bulan lalu
induk
melakukan
0210b84cf3

+ 20 - 5
client/ODPSClient.py

@@ -29,21 +29,36 @@ class ODPSClient(object):
             pd_df = reader.to_pandas()
         return pd_df
 
+    def execute_sql_file_result_save_fle(self, sql_file: str, param: dict, filepath: str):
+        with open(sql_file, "r") as f:
+            sql = f.read()
+            for key in param:
+                sql = sql.replace(f"${{{key}}}", param[key])
+            self.execute_sql_result_save_file(sql, filepath)
+
     def execute_sql_result_save_file(self, sql: str, filepath: str):
         result = self.execute_sql(sql)
         result.to_csv(filepath, index=False)
 
-    def get_all_record(self, table: str, dt: str) -> list:
+    def get_all_record(self, table: str, partition_spec: str) -> list:
         tunnel = TableTunnel(self.odps)
 
-        download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
+        download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
         count = download_session.count
 
-        print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
+        print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
         result = []
-        with download_session.open_record_reader(0, download_session.count) as reader:
+        # 使用批量读取,并按指定大小读取记录
+        with download_session.open_record_reader(0, count) as reader:
+            batch = []
             for record in reader:
-                result.append(record)
+                batch.append(record)
+                if len(batch) >= 1000:
+                    result.extend(batch)
+                    batch.clear()  # 清空批次以便继续加载新的记录
+            # 加入最后一批剩余的数据
+            if batch:
+                result.extend(batch)
         return result
 
     def get_table(self, table: str):

+ 35 - 0
model/XGBModel.py

@@ -0,0 +1,35 @@
+import numpy as np
+import xgboost as xgb
+
+
+class XGBModel(object):
+    def __init__(self, model_file, features: list):
+        self.model_file = model_file
+        self.model = xgb.Booster(model_file=model_file)
+        self.features = features
+
+    def predict(self, feature_map: dict) -> float:
+        values = np.array([
+            float(feature_map.get(feature, 0.0))
+            for feature in self.features
+        ])
+
+        dm = xgb.DMatrix(values.reshape(1, -1), missing=0.0)
+        return float(self.model.predict(dm, output_margin=False)[0])
+
+    def feature_weight_importance(self):
+        return self.feature_importance("weight")
+
+    def feature_cover_importance(self):
+        return self.feature_importance("cover")
+
+    def feature_gain_importance(self):
+        return self.feature_importance("gain")
+
+    def feature_importance(self, importance_type: str):
+        importance_map = {}
+        score_map = self.model.get_score(importance_type=importance_type)
+        for key in score_map:
+            k = self.features[int(key[1:])]
+            importance_map[k] = score_map[key]
+        return importance_map

+ 420 - 0
model/feature.py

@@ -0,0 +1,420 @@
+import glob
+import os.path
+
+import numpy as np
+import pandas as pd
+import xgboost as xgb
+
+from model.XGBModel import XGBModel
+
+features = [
+    "cpa",
+    "b2_3h_ctr",
+    "b2_3h_ctcvr",
+    "b2_3h_cvr",
+    "b2_3h_conver",
+    "b2_3h_ecpm",
+    "b2_3h_click",
+    "b2_3h_conver*log(view)",
+    "b2_3h_conver*ctcvr",
+    "b2_6h_ctr",
+    "b2_6h_ctcvr",
+    "b2_6h_cvr",
+    "b2_6h_conver",
+    "b2_6h_ecpm",
+    "b2_6h_click",
+    "b2_6h_conver*log(view)",
+    "b2_6h_conver*ctcvr",
+    "b2_12h_ctr",
+    "b2_12h_ctcvr",
+    "b2_12h_cvr",
+    "b2_12h_conver",
+    "b2_12h_ecpm",
+    "b2_12h_click",
+    "b2_12h_conver*log(view)",
+    "b2_12h_conver*ctcvr",
+    "b2_1d_ctr",
+    "b2_1d_ctcvr",
+    "b2_1d_cvr",
+    "b2_1d_conver",
+    "b2_1d_ecpm",
+    "b2_1d_click",
+    "b2_1d_conver*log(view)",
+    "b2_1d_conver*ctcvr",
+    "b2_3d_ctr",
+    "b2_3d_ctcvr",
+    "b2_3d_cvr",
+    "b2_3d_conver",
+    "b2_3d_ecpm",
+    "b2_3d_click",
+    "b2_3d_conver*log(view)",
+    "b2_3d_conver*ctcvr",
+    "b2_7d_ctr",
+    "b2_7d_ctcvr",
+    "b2_7d_cvr",
+    "b2_7d_conver",
+    "b2_7d_ecpm",
+    "b2_7d_click",
+    "b2_7d_conver*log(view)",
+    "b2_7d_conver*ctcvr",
+    "b3_3h_ctr",
+    "b3_3h_ctcvr",
+    "b3_3h_cvr",
+    "b3_3h_conver",
+    "b3_3h_ecpm",
+    "b3_3h_click",
+    "b3_3h_conver*log(view)",
+    "b3_3h_conver*ctcvr",
+    "b3_6h_ctr",
+    "b3_6h_ctcvr",
+    "b3_6h_cvr",
+    "b3_6h_conver",
+    "b3_6h_ecpm",
+    "b3_6h_click",
+    "b3_6h_conver*log(view)",
+    "b3_6h_conver*ctcvr",
+    "b3_12h_ctr",
+    "b3_12h_ctcvr",
+    "b3_12h_cvr",
+    "b3_12h_conver",
+    "b3_12h_ecpm",
+    "b3_12h_click",
+    "b3_12h_conver*log(view)",
+    "b3_12h_conver*ctcvr",
+    "b3_1d_ctr",
+    "b3_1d_ctcvr",
+    "b3_1d_cvr",
+    "b3_1d_conver",
+    "b3_1d_ecpm",
+    "b3_1d_click",
+    "b3_1d_conver*log(view)",
+    "b3_1d_conver*ctcvr",
+    "b3_3d_ctr",
+    "b3_3d_ctcvr",
+    "b3_3d_cvr",
+    "b3_3d_conver",
+    "b3_3d_ecpm",
+    "b3_3d_click",
+    "b3_3d_conver*log(view)",
+    "b3_3d_conver*ctcvr",
+    "b3_7d_ctr",
+    "b3_7d_ctcvr",
+    "b3_7d_cvr",
+    "b3_7d_conver",
+    "b3_7d_ecpm",
+    "b3_7d_click",
+    "b3_7d_conver*log(view)",
+    "b3_7d_conver*ctcvr",
+    "b4_3h_ctr",
+    "b4_3h_ctcvr",
+    "b4_3h_cvr",
+    "b4_3h_conver",
+    "b4_3h_ecpm",
+    "b4_3h_click",
+    "b4_3h_conver*log(view)",
+    "b4_3h_conver*ctcvr",
+    "b4_6h_ctr",
+    "b4_6h_ctcvr",
+    "b4_6h_cvr",
+    "b4_6h_conver",
+    "b4_6h_ecpm",
+    "b4_6h_click",
+    "b4_6h_conver*log(view)",
+    "b4_6h_conver*ctcvr",
+    "b4_12h_ctr",
+    "b4_12h_ctcvr",
+    "b4_12h_cvr",
+    "b4_12h_conver",
+    "b4_12h_ecpm",
+    "b4_12h_click",
+    "b4_12h_conver*log(view)",
+    "b4_12h_conver*ctcvr",
+    "b4_1d_ctr",
+    "b4_1d_ctcvr",
+    "b4_1d_cvr",
+    "b4_1d_conver",
+    "b4_1d_ecpm",
+    "b4_1d_click",
+    "b4_1d_conver*log(view)",
+    "b4_1d_conver*ctcvr",
+    "b4_3d_ctr",
+    "b4_3d_ctcvr",
+    "b4_3d_cvr",
+    "b4_3d_conver",
+    "b4_3d_ecpm",
+    "b4_3d_click",
+    "b4_3d_conver*log(view)",
+    "b4_3d_conver*ctcvr",
+    "b4_7d_ctr",
+    "b4_7d_ctcvr",
+    "b4_7d_cvr",
+    "b4_7d_conver",
+    "b4_7d_ecpm",
+    "b4_7d_click",
+    "b4_7d_conver*log(view)",
+    "b4_7d_conver*ctcvr",
+    "b5_3h_ctr",
+    "b5_3h_ctcvr",
+    "b5_3h_cvr",
+    "b5_3h_conver",
+    "b5_3h_ecpm",
+    "b5_3h_click",
+    "b5_3h_conver*log(view)",
+    "b5_3h_conver*ctcvr",
+    "b5_6h_ctr",
+    "b5_6h_ctcvr",
+    "b5_6h_cvr",
+    "b5_6h_conver",
+    "b5_6h_ecpm",
+    "b5_6h_click",
+    "b5_6h_conver*log(view)",
+    "b5_6h_conver*ctcvr",
+    "b5_12h_ctr",
+    "b5_12h_ctcvr",
+    "b5_12h_cvr",
+    "b5_12h_conver",
+    "b5_12h_ecpm",
+    "b5_12h_click",
+    "b5_12h_conver*log(view)",
+    "b5_12h_conver*ctcvr",
+    "b5_1d_ctr",
+    "b5_1d_ctcvr",
+    "b5_1d_cvr",
+    "b5_1d_conver",
+    "b5_1d_ecpm",
+    "b5_1d_click",
+    "b5_1d_conver*log(view)",
+    "b5_1d_conver*ctcvr",
+    "b5_3d_ctr",
+    "b5_3d_ctcvr",
+    "b5_3d_cvr",
+    "b5_3d_conver",
+    "b5_3d_ecpm",
+    "b5_3d_click",
+    "b5_3d_conver*log(view)",
+    "b5_3d_conver*ctcvr",
+    "b5_7d_ctr",
+    "b5_7d_ctcvr",
+    "b5_7d_cvr",
+    "b5_7d_conver",
+    "b5_7d_ecpm",
+    "b5_7d_click",
+    "b5_7d_conver*log(view)",
+    "b5_7d_conver*ctcvr",
+    "b8_3h_ctr",
+    "b8_3h_ctcvr",
+    "b8_3h_cvr",
+    "b8_3h_conver",
+    "b8_3h_ecpm",
+    "b8_3h_click",
+    "b8_3h_conver*log(view)",
+    "b8_3h_conver*ctcvr",
+    "b8_6h_ctr",
+    "b8_6h_ctcvr",
+    "b8_6h_cvr",
+    "b8_6h_conver",
+    "b8_6h_ecpm",
+    "b8_6h_click",
+    "b8_6h_conver*log(view)",
+    "b8_6h_conver*ctcvr",
+    "b8_12h_ctr",
+    "b8_12h_ctcvr",
+    "b8_12h_cvr",
+    "b8_12h_conver",
+    "b8_12h_ecpm",
+    "b8_12h_click",
+    "b8_12h_conver*log(view)",
+    "b8_12h_conver*ctcvr",
+    "b8_1d_ctr",
+    "b8_1d_ctcvr",
+    "b8_1d_cvr",
+    "b8_1d_conver",
+    "b8_1d_ecpm",
+    "b8_1d_click",
+    "b8_1d_conver*log(view)",
+    "b8_1d_conver*ctcvr",
+    "b8_3d_ctr",
+    "b8_3d_ctcvr",
+    "b8_3d_cvr",
+    "b8_3d_conver",
+    "b8_3d_ecpm",
+    "b8_3d_click",
+    "b8_3d_conver*log(view)",
+    "b8_3d_conver*ctcvr",
+    "b8_7d_ctr",
+    "b8_7d_ctcvr",
+    "b8_7d_cvr",
+    "b8_7d_conver",
+    "b8_7d_ecpm",
+    "b8_7d_click",
+    "b8_7d_conver*log(view)",
+    "b8_7d_conver*ctcvr",
+    "b6_7d_ctr",
+    "b6_7d_ctcvr",
+    "b6_7d_cvr",
+    "b6_7d_conver",
+    "b6_7d_ecpm",
+    "b6_7d_click",
+    "b6_7d_conver*log(view)",
+    "b6_7d_conver*ctcvr",
+    "b6_14d_ctr",
+    "b6_14d_ctcvr",
+    "b6_14d_cvr",
+    "b6_14d_conver",
+    "b6_14d_ecpm",
+    "b6_14d_click",
+    "b6_14d_conver*log(view)",
+    "b6_14d_conver*ctcvr",
+    "b7_7d_ctr",
+    "b7_7d_ctcvr",
+    "b7_7d_cvr",
+    "b7_7d_conver",
+    "b7_7d_ecpm",
+    "b7_7d_click",
+    "b7_7d_conver*log(view)",
+    "b7_7d_conver*ctcvr",
+    "b7_14d_ctr",
+    "b7_14d_ctcvr",
+    "b7_14d_cvr",
+    "b7_14d_conver",
+    "b7_14d_ecpm",
+    "b7_14d_click",
+    "b7_14d_conver*log(view)",
+    "b7_14d_conver*ctcvr",
+    "viewAll",
+    "clickAll",
+    "converAll",
+    "incomeAll",
+    "ctr_all",
+    "ctcvr_all",
+    "cvr_all",
+    "ecpm_all",
+    "timediff_view",
+    "timediff_click",
+    "timediff_conver",
+    "actionstatic_view",
+    "actionstatic_click",
+    "actionstatic_conver",
+    "actionstatic_income",
+    "actionstatic_ctr",
+    "actionstatic_ctcvr",
+    "actionstatic_cvr",
+    "e1_tags_3d_matchnum",
+    "e1_tags_3d_maxscore",
+    "e1_tags_3d_avgscore",
+    "e1_tags_7d_matchnum",
+    "e1_tags_7d_maxscore",
+    "e1_tags_7d_avgscore",
+    "e1_tags_14d_matchnum",
+    "e1_tags_14d_maxscore",
+    "e1_tags_14d_avgscore",
+    "e2_tags_3d_matchnum",
+    "e2_tags_3d_maxscore",
+    "e2_tags_3d_avgscore",
+    "e2_tags_7d_matchnum",
+    "e2_tags_7d_maxscore",
+    "e2_tags_7d_avgscore",
+    "e2_tags_14d_matchnum",
+    "e2_tags_14d_maxscore",
+    "e2_tags_14d_avgscore",
+    "d1_feature_3h_ctr",
+    "d1_feature_3h_ctcvr",
+    "d1_feature_3h_cvr",
+    "d1_feature_3h_conver",
+    "d1_feature_3h_ecpm",
+    "d1_feature_6h_ctr",
+    "d1_feature_6h_ctcvr",
+    "d1_feature_6h_cvr",
+    "d1_feature_6h_conver",
+    "d1_feature_6h_ecpm",
+    "d1_feature_12h_ctr",
+    "d1_feature_12h_ctcvr",
+    "d1_feature_12h_cvr",
+    "d1_feature_12h_conver",
+    "d1_feature_12h_ecpm",
+    "d1_feature_1d_ctr",
+    "d1_feature_1d_ctcvr",
+    "d1_feature_1d_cvr",
+    "d1_feature_1d_conver",
+    "d1_feature_1d_ecpm",
+    "d1_feature_3d_ctr",
+    "d1_feature_3d_ctcvr",
+    "d1_feature_3d_cvr",
+    "d1_feature_3d_conver",
+    "d1_feature_3d_ecpm",
+    "d1_feature_7d_ctr",
+    "d1_feature_7d_ctcvr",
+    "d1_feature_7d_cvr",
+    "d1_feature_7d_conver",
+    "d1_feature_7d_ecpm",
+    "vid_rank_ctr_1d",
+    "vid_rank_ctr_3d",
+    "vid_rank_ctr_7d",
+    "vid_rank_ctr_14d",
+    "vid_rank_ctcvr_1d",
+    "vid_rank_ctcvr_3d",
+    "vid_rank_ctcvr_7d",
+    "vid_rank_ctcvr_14d",
+    "vid_rank_ecpm_1d",
+    "vid_rank_ecpm_3d",
+    "vid_rank_ecpm_7d",
+    "vid_rank_ecpm_14d"
+]
+
+
+def load_model_and_score(model_path, feature_map):
+    model = xgb.Booster()
+    model.load_model(f"{model_path}/data/XGBoostClassificationModel")
+    model.set_param({"missing": 0.0})
+
+    values = np.array([
+        float(feature_map.get(feature, 0.0))
+        for feature in features
+    ], dtype=np.float32)
+
+    dm = xgb.DMatrix(values.reshape(1, -1), missing=0.0)
+    return float(model.predict(dm, output_margin=False)[0])
+
+
+def _multi_importance_flat_map(importance_map: dict):
+    result = []
+    all_features = set(key for inner_dict in importance_map.values() for key in inner_dict.keys())
+    for feature in all_features:
+        item = {
+            "feature": feature,
+        }
+        for key in importance_map:
+            if feature in importance_map[key]:
+                item[key] = importance_map[key][feature]
+        result.append(item)
+    return result
+
+
+def _main():
+    model_path = "/Users/zhao/Desktop/tzld/XGB/35_ad_model"
+    all_model = glob.glob(f"{model_path}/*")
+    model_dict = {}
+    for e in all_model:
+        if "model_xgb_351_1000_v2" in e:
+            model_dict[e] = XGBModel(model_file=f"{e}/data/XGBoostClassificationModel", features=features)
+
+    weight_dict = {}
+    cover_dict = {}
+    gain_dict = {}
+    for key in model_dict:
+        dt = os.path.basename(key)[-9:]
+        weight_dict[dt] = model_dict[key].feature_weight_importance()
+        cover_dict[dt] = model_dict[key].feature_cover_importance()
+        gain_dict[dt] = model_dict[key].feature_gain_importance()
+
+    weight = _multi_importance_flat_map(dict(sorted(weight_dict.items())))
+    cover = _multi_importance_flat_map(dict(sorted(cover_dict.items())))
+    gain = _multi_importance_flat_map(dict(sorted(gain_dict.items())))
+
+    pd.DataFrame(weight).to_csv("/Users/zhao/Desktop/weight.csv", index=False)
+    pd.DataFrame(cover).to_csv("/Users/zhao/Desktop/cover.csv", index=False)
+    pd.DataFrame(gain).to_csv("/Users/zhao/Desktop/gain.csv", index=False)
+
+
+if __name__ == '__main__':
+    _main()

+ 0 - 198
model/model_predict_analyse.py

@@ -1,198 +0,0 @@
-import argparse
-import gzip
-import os.path
-from collections import OrderedDict
-
-import pandas as pd
-from hdfs import InsecureClient
-
-client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
-
-SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
-PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
-
-
-def read_predict_from_local_txt(txt_file) -> list:
-    result = []
-    with open(txt_file, "r") as f:
-        for line in f.readlines():
-            sp = line.replace("\n", "").split("\t")
-            if len(sp) == 4:
-                label = int(sp[0])
-                cid = sp[3].split("_")[0]
-                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
-                result.append({
-                    "label": label,
-                    "cid": cid,
-                    "score": score
-                })
-    return result
-
-
-def read_predict_from_hdfs(hdfs_path: str) -> list:
-    if not hdfs_path.endswith("/"):
-        hdfs_path += "/"
-    result = []
-    for file in client.list(hdfs_path):
-        with client.read(hdfs_path + file) as reader:
-            with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
-                for line in gz_file.read().decode("utf-8").split("\n"):
-                    split = line.split("\t")
-                    if len(split) == 4:
-                        cid = split[3].split("_")[0]
-                        label = int(split[0])
-                        score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
-                        result.append({
-                            "cid": cid,
-                            "label": label,
-                            "score": score
-                        })
-
-    return result
-
-
-def _segment_v1(scores, step):
-    bins = []
-    for i in range(0, len(scores), int((len(scores) / step))):
-        if i == 0:
-            bins.append(0)
-        else:
-            bins.append(scores[i])
-    bins.append(1)
-    return list(OrderedDict.fromkeys(bins))
-
-
-def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
-    sored_df = df.sort_values(by=['score'])
-    # 评估分数分段
-    scores = sored_df['score'].values
-
-    bins = _segment_v1(scores, step)
-
-    # 等分分桶
-    # split_indices = np.array_split(np.arange(len(scores)), step)
-    # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
-
-    sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
-
-    # 计算分段内分数的差异
-    group_df = sored_df.groupby("score_segment", observed=True).agg(
-        segment_label_sum=('label', 'sum'),
-        segment_label_cnt=('label', 'count'),
-        segment_score_avg=('score', 'mean'),
-    ).reset_index()
-    group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
-    group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
-
-    # 完整的分段文件保存
-    csv_data = group_df.to_csv(sep="\t", index=False)
-    with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
-        writer.write(csv_data)
-
-    filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
-    filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
-    # 每条曝光数据添加对应分数的diff
-    merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
-
-    merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
-    return merged_df, filtered_df
-
-
-def read_and_calibration_predict(predict_path: str, step=100) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
-    """
-    读取评估结果,并进行校准
-    """
-    # 本地调试使用
-    # predicts = read_predict_from_local_txt(predict_path)
-    predicts = read_predict_from_hdfs(predict_path)
-    df = pd.DataFrame(predicts)
-
-    # 模型分分段计算与真实ctcvr的dff_rate
-    predict_basename = os.path.basename(predict_path)
-    if predict_basename.endswith("/"):
-        predict_basename = predict_basename[:-1]
-    df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
-
-    # 生成校准后的分数
-    df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
-
-    # 按CID统计真实ctcvr和校准前后的平均模型分
-    grouped_df = df.groupby("cid").agg(
-        view=('cid', 'size'),
-        conv=('label', 'sum'),
-        score_avg=('score', lambda x: round(x.mean(), 6)),
-        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
-    ).reset_index()
-    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
-
-    return df, grouped_df, segment_df
-
-
-def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
-    """
-    本地保存一份评估结果, 计算AUC使用
-    """
-    d = {"old": old_df, "new": new_df}
-    for key in d:
-        df = d[key][['label', "score"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.csv", sep="\t", index=False, header=False)
-        df = d[key][['label', "score_2"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.csv", sep="\t", index=False, header=False)
-
-
-def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
-    old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
-    new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
-
-    # predict_local_save_for_auc(old_df, new_df)
-
-    # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
-    new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
-
-    # 字段重命名,和列过滤
-    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
-    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
-    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
-    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
-
-    merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
-
-    # 计算与真实ctcvr的差异值
-    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-
-    # 计算校准后的模型分与ctcvr的差异值
-    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-
-    # 按照曝光排序,写入本地文件
-    merged = merged.sort_values(by=['view'], ascending=False)
-    merged = merged[[
-        'cid', 'view', "conv", "true_ctcvr",
-        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
-        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
-    ]]
-
-    # 根据文件名保存不同的格式
-    if analyse_file.endswith(".csv"):
-        merged.to_csv(analyse_file, index=False)
-    else:
-        with open(analyse_file, "w") as writer:
-            writer.write(merged.to_string(index=False))
-    print("0")
-
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description="model_predict_analyse.py")
-    parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
-    parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
-    parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
-    parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
-    args = parser.parse_args()
-
-    _main(
-        old_predict_path=args.old_predict_path,
-        new_predict_path=args.new_predict_path,
-        calibration_file=args.calibration_file,
-        analyse_file=args.analyse_file
-    )

+ 198 - 0
model/model_predict_analyse_20241101.py

@@ -0,0 +1,198 @@
+# import argparse
+# import gzip
+# import os.path
+# from collections import OrderedDict
+#
+# import pandas as pd
+# from hdfs import InsecureClient
+#
+# client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
+#
+# SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
+# PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
+#
+#
+# def read_predict_from_local_txt(txt_file) -> list:
+#     result = []
+#     with open(txt_file, "r") as f:
+#         for line in f.readlines():
+#             sp = line.replace("\n", "").split("\t")
+#             if len(sp) == 4:
+#                 label = int(sp[0])
+#                 cid = sp[3].split("_")[0]
+#                 score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+#                 result.append({
+#                     "label": label,
+#                     "cid": cid,
+#                     "score": score
+#                 })
+#     return result
+#
+#
+# def read_predict_from_hdfs(hdfs_path: str) -> list:
+#     if not hdfs_path.endswith("/"):
+#         hdfs_path += "/"
+#     result = []
+#     for file in client.list(hdfs_path):
+#         with client.read(hdfs_path + file) as reader:
+#             with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
+#                 for line in gz_file.read().decode("utf-8").split("\n"):
+#                     split = line.split("\t")
+#                     if len(split) == 4:
+#                         cid = split[3].split("_")[0]
+#                         label = int(split[0])
+#                         score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
+#                         result.append({
+#                             "cid": cid,
+#                             "label": label,
+#                             "score": score
+#                         })
+#
+#     return result
+#
+#
+# def _segment_v1(scores, step):
+#     bins = []
+#     for i in range(0, len(scores), int((len(scores) / step))):
+#         if i == 0:
+#             bins.append(0)
+#         else:
+#             bins.append(scores[i])
+#     bins.append(1)
+#     return list(OrderedDict.fromkeys(bins))
+#
+#
+# def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
+#     sored_df = df.sort_values(by=['score'])
+#     # 评估分数分段
+#     scores = sored_df['score'].values
+#
+#     bins = _segment_v1(scores, step)
+#
+#     # 等分分桶
+#     # split_indices = np.array_split(np.arange(len(scores)), step)
+#     # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
+#
+#     sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
+#
+#     # 计算分段内分数的差异
+#     group_df = sored_df.groupby("score_segment", observed=True).agg(
+#         segment_label_sum=('label', 'sum'),
+#         segment_label_cnt=('label', 'count'),
+#         segment_score_avg=('score', 'mean'),
+#     ).reset_index()
+#     group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
+#     group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
+#
+#     # 完整的分段文件保存
+#     csv_data = group_df.to_csv(sep="\t", index=False)
+#     with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
+#         writer.write(csv_data)
+#
+#     filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
+#     filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
+#     # 每条曝光数据添加对应分数的diff
+#     merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
+#
+#     merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
+#     return merged_df, filtered_df
+#
+#
+# def read_and_calibration_predict(predict_path: str) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
+#     """
+#     读取评估结果,并进行校准
+#     """
+#     # 本地调试使用
+#     # predicts = read_predict_from_local_txt(predict_path)
+#     predicts = read_predict_from_hdfs(predict_path)
+#     df = pd.DataFrame(predicts)
+#
+#     # 模型分分段计算与真实ctcvr的dff_rate
+#     predict_basename = os.path.basename(predict_path)
+#     if predict_basename.endswith("/"):
+#         predict_basename = predict_basename[:-1]
+#     df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
+#
+#     # 生成校准后的分数
+#     df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
+#
+#     # 按CID统计真实ctcvr和校准前后的平均模型分
+#     grouped_df = df.groupby("cid").agg(
+#         view=('cid', 'size'),
+#         conv=('label', 'sum'),
+#         score_avg=('score', lambda x: round(x.mean(), 6)),
+#         score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
+#     ).reset_index()
+#     grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
+#
+#     return df, grouped_df, segment_df
+#
+#
+# def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
+#     """
+#     本地保存一份评估结果, 计算AUC使用
+#     """
+#     d = {"old": old_df, "new": new_df}
+#     for key in d:
+#         df = d[key][['label', "score"]]
+#         df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
+#         df = d[key][['label', "score_2"]]
+#         df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
+#
+#
+# def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
+#     old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
+#     new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
+#
+#     predict_local_save_for_auc(old_df, new_df)
+#
+#     # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
+#     new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
+#
+#     # 字段重命名,和列过滤
+#     old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+#     new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+#     old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+#     new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
+#
+#     merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
+#
+#     # 计算与真实ctcvr的差异值
+#     merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+#     merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+#
+#     # 计算校准后的模型分与ctcvr的差异值
+#     merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+#     merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+#
+#     # 按照曝光排序,写入本地文件
+#     merged = merged.sort_values(by=['view'], ascending=False)
+#     merged = merged[[
+#         'cid', 'view', "conv", "true_ctcvr",
+#         "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
+#         "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
+#     ]]
+#
+#     # 根据文件名保存不同的格式
+#     if analyse_file.endswith(".csv"):
+#         merged.to_csv(analyse_file, index=False)
+#     else:
+#         with open(analyse_file, "w") as writer:
+#             writer.write(merged.to_string(index=False))
+#     print("0")
+#
+#
+# if __name__ == '__main__':
+#     parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
+#     parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
+#     parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
+#     parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
+#     parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
+#     args = parser.parse_args()
+#
+#     _main(
+#         old_predict_path=args.old_predict_path,
+#         new_predict_path=args.new_predict_path,
+#         calibration_file=args.calibration_file,
+#         analyse_file=args.analyse_file
+#     )

+ 183 - 0
model/model_predict_analyse_20241115.py

@@ -0,0 +1,183 @@
+import gzip
+import os.path
+
+import pandas as pd
+from hdfs import InsecureClient
+
+client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
+
+SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
+PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
+
+
+def parse_predict_line(line: str) -> [bool, dict]:
+    sp = line.replace("\n", "").split("\t")
+    if len(sp) == 4:
+        label = int(sp[0])
+        cid = sp[3].split("_")[0]
+        score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
+        return True, {
+            "label": label,
+            "cid": cid,
+            "score": score
+        }
+    return False, {}
+
+
+def read_predict_file(file_path: str) -> pd.DataFrame:
+    result = []
+    if file_path.startswith("/dw"):
+        if not file_path.endswith("/"):
+            file_path += "/"
+        for file in client.list(file_path):
+            with client.read(file_path + file) as reader:
+                with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
+                    for line in gz_file.read().decode("utf-8").split("\n"):
+                        b, d = parse_predict_line(line)
+                        if b: result.append(d)
+    else:
+        with open(file_path, "r") as f:
+            for line in f.readlines():
+                b, d = parse_predict_line(line)
+                if b: result.append(d)
+    return pd.DataFrame(result)
+
+
+def calibration_file_save(df: pd.DataFrame, file_path: str):
+    if file_path.startswith("/dw"):
+        # 完整的分段文件保存
+        with client.write(file_path, encoding='utf-8', overwrite=True) as writer:
+            writer.write(df.to_csv(sep="\t", index=False))
+    else:
+        df.tocsv(file_path, sep="\t", index=False)
+
+
+def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
+    """
+    本地保存一份评估结果, 计算AUC使用
+    """
+    d = {"old": old_df, "new": new_df}
+    for key in d:
+        df = d[key][['label', "score"]]
+        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
+        df = d[key][['label', "score_2"]]
+        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
+
+
+def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str):
+    if segment_file_path.startswith("/dw"):
+        # 完整的分段文件保存
+        with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
+            writer.write(df.to_csv(sep="\t", index=False))
+    else:
+        df.tocsv(segment_file_path, sep="\t", index=False)
+
+
+def calc_calibration_diff_rate(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]:
+    """
+    计算模型分的diff_rate
+    """
+    agg_df = predict_df_agg(df)
+    condition = 'view > 1000 and diff_rate >= 0.1'
+    save_full_calibration_file(agg_df, f"{PREDICT_CACHE_PATH}/{predict_basename}.txt")
+    calibration = agg_df.query(condition)
+    return calibration
+
+
+def get_predict_basename(predict_path) -> [str]:
+    """
+    获取文件路径的最后一部分,作为与模型关联的文件名
+    """
+    predict_basename = os.path.basename(predict_path)
+    if predict_basename.endswith("/"):
+        predict_basename = predict_basename[:-1]
+
+    return predict_basename
+
+
+def calc_calibration_score2(df: pd.DataFrame, calibration_df: pd.DataFrame) -> [pd.DataFrame]:
+    calibration_df = calibration_df[['cid', 'diff_rate']]
+    df = pd.merge(df, calibration_df, on='cid', how='left')
+    df.fullna(0)
+    df['score_2'] = df['score'] / (1 + df['diff_rate'])
+    return df
+
+
+def predict_df_agg(df: pd.DataFrame) -> [pd.DataFrame]:
+    # 基础聚合操作
+    agg_operations = {
+        'view': ('cid', 'size'),
+        'conv': ('label', 'sum'),
+        'p_score_avg': ('score', lambda x: round(x.mean(), 6)),
+    }
+
+    # 如果存在 score_2 列,则增加相关聚合
+    if "score_2" in df.columns:
+        agg_operations['p_score_2_avg'] = ('score_2', lambda x: round(x.mean(), 6))
+
+    grouped_df = df.groupby("cid").agg(**agg_operations).reset_index()
+    grouped_df['t_ctcvr'] = grouped_df['conv'] / grouped_df['view']
+
+    return grouped_df
+
+
+def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
+    old_df = read_predict_file(old_predict_path)
+    new_df = read_predict_file(new_predict_path)
+
+    old_calibration_df = calc_calibration_diff_rate(old_df, get_predict_basename(old_predict_path))
+    old_df = calc_calibration_score2(old_df, old_calibration_df)
+
+    new_calibration_df = calc_calibration_diff_rate(new_df, get_predict_basename(new_predict_path))
+    new_df = calc_calibration_score2(new_df, new_calibration_df)
+
+    old_agg_df = predict_df_agg(old_df)
+    new_agg_df = predict_df_agg(new_df)
+
+    # 字段重命名,和列过滤
+    old_agg_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_agg_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_group_df = old_agg_df[['cid', 'view', 'conv', 't_ctcvr', 'old_score_avg', 'old_score_2_avg']]
+    new_group_df = new_agg_df[['cid', 'new_score_avg', 'new_score_2_avg']]
+    merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
+
+    # 计算与真实ctcvr的差异值
+    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+
+    # 计算校准后的模型分与ctcvr的差异值
+    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+
+    # 按照曝光排序,写入本地文件
+    merged = merged.sort_values(by=['view'], ascending=False)
+    merged = merged[[
+        'cid', 'view', "conv", "true_ctcvr",
+        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
+        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
+    ]]
+
+    # 根据文件名保存不同的格式
+    if analyse_file.endswith(".csv"):
+        merged.to_csv(analyse_file, index=False)
+    else:
+        with open(analyse_file, "w") as writer:
+            writer.write(merged.to_string(index=False))
+    print("0")
+
+
+if __name__ == '__main__':
+    _main("", "", "", "")
+    # parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
+    # parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
+    # parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
+    # parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
+    # parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
+    # args = parser.parse_args()
+    #
+    # _main(
+    #     old_predict_path=args.old_predict_path,
+    #     new_predict_path=args.new_predict_path,
+    #     calibration_file=args.calibration_file,
+    #     analyse_file=args.analyse_file
+    # )

+ 0 - 172
model/segment_calibration_check.py

@@ -1,172 +0,0 @@
-import logging
-
-import pandas as pd
-
-# client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
-
-SEGMENT_BASE_PATH = "/Users/zhao/Desktop/tzld/XGB/segment_csv"
-logger = logging.getLogger(__name__)
-
-
-def read_predict_from_local_txt(txt_file) -> list:
-    result = []
-    with open(txt_file, "r") as f:
-        for line in f.readlines():
-            sp = line.replace("\n", "").split("\t")
-            if len(sp) == 4:
-                label = int(sp[0])
-                cid = sp[3].split("_")[0]
-                score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
-                result.append({
-                    "label": label,
-                    "cid": cid,
-                    "score": score
-                })
-    return result
-
-
-def read_predict_from_hdfs(hdfs_path: str) -> list:
-    if not hdfs_path.endswith("/"):
-        hdfs_path += "/"
-    result = []
-    # for file in client.list(hdfs_path):
-    #     with client.read(hdfs_path + file) as reader:
-    #         with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
-    #             for line in gz_file.read().decode("utf-8").split("\n"):
-    #                 split = line.split("\t")
-    #                 if len(split) == 4:
-    #                     continue
-    #                 cid = split[3].split("_")[0]
-    #                 label = int(split[0])
-    #                 score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
-    #
-    #                 result.append({
-    #                     "cid": cid,
-    #                     "label": label,
-    #                     "score": score
-    #                 })
-    #
-    return result
-
-
-def read_cpa() -> pd.DataFrame:
-    # 添加CPA
-    cpa_target_df = pd.read_csv("/Users/zhao/Desktop/tzld/XGB/creative_package_target.csv")
-    cpa_target_df = cpa_target_df[['creative_id', 'cpa_target']]
-    cpa_target_df.rename(columns={'creative_id': 'cid'}, inplace=True)
-    cpa_target_df['cid'] = cpa_target_df['cid'].astype(str)
-
-    return cpa_target_df
-
-
-def read_calibration_file(filepath: str) -> list:
-    segment = []
-    with open(filepath, "r") as f:
-        for line in f.readlines():
-            s = line.split("\t")
-            if len(s) != 2:
-                continue
-
-            score = s[0].split(",")
-            min_score = float(score[0].replace("(", ""))
-            max_score = float(score[1].replace("]", ""))
-            alpha = float(s[1])
-            segment.append((min_score, max_score, alpha))
-    return segment
-
-
-def score_calibration(score, segment: list):
-    for e in segment:
-        if (score >= e[0]) & (score <= e[1]):
-            return score * (1 + e[2])
-    return score
-
-
-def read_and_calibration_predict(predict_path: str, is_hdfs=True) -> [pd.DataFrame]:
-    """
-    读取评估结果,并进行校准
-    """
-    if is_hdfs:
-        # 文件路径处理
-        predicts = read_predict_from_hdfs(predict_path)
-    else:
-        predicts = read_predict_from_local_txt(predict_path)
-    df = pd.DataFrame(predicts)
-
-    # 每条曝光添加对应创意的CPA,并计算CPM
-    cpa_target_df = read_cpa()
-    df = pd.merge(df, cpa_target_df, on='cid', how='left')
-    df['p_cpm'] = df['score'] * df['cpa_target'] * 1000
-    df['t_cpm'] = df['score'] * df['cpa_target'] * 1000 * df['label']
-
-    segment = read_calibration_file("/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt")
-    # 生成校准后的分数并计算对应的p_cpm和t_cpm
-    df['score_2'] = df['score'].apply(lambda x: score_calibration(x, segment))
-    df['p_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000
-    df['t_cpm_2'] = df['score_2'] * df['cpa_target'] * 1000 * df['label']
-
-    # 按CID统计真实ctcvr和校准前后的平均模型分
-    grouped_df = df.groupby("cid").agg(
-        view=('cid', 'size'),
-        conv=('label', 'sum'),
-        score_avg=('score', lambda x: round(x.mean(), 6)),
-        score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
-    ).reset_index()
-    grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
-
-    return grouped_df
-
-
-def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
-    old_group_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
-    new_group_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
-
-    # 字段重命名,和列过滤
-    old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
-    new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
-    old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
-    new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
-
-    # 拼接CPA
-    cpa_target_df = read_cpa()
-    merged = (pd.merge(old_group_df, new_group_df, on='cid', how='left')
-              .merge(cpa_target_df, on='cid', how='left'))
-
-    # 计算与真实ctcvr的差异值
-    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged['new_cpm'] = merged['new_score_avg'] * merged['cpa_target'] * 1000
-    merged['old_cpm'] = merged['old_score_avg'] * merged['cpa_target'] * 1000
-
-    # 计算校准后的模型分与ctcvr的差异值
-    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged['new2_cpm'] = merged['new_score_2_avg'] * merged['cpa_target'] * 1000
-    merged['old2_cpm'] = merged['old_score_2_avg'] * merged['cpa_target'] * 1000
-
-    # 计算真实的CPM
-    merged['true_cpm'] = merged['true_ctcvr'] * merged['cpa_target'] * 1000
-    # 按照曝光排序,写入本地文件
-    merged = merged.sort_values(by=['view'], ascending=False)
-    merged = merged[[
-        'cid', 'view', "conv", "true_ctcvr", 'true_cpm',
-        "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true", "old_cpm", "new_cpm",
-        "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true", "old2_cpm", "new2_cpm",
-    ]]
-
-    # 根据文件名保存不同的格式
-    if analyse_file.endswith(".csv"):
-        merged.to_csv(analyse_file, index=False)
-    else:
-        with open(analyse_file, "w") as writer:
-            writer.write(merged.to_string(index=False))
-    print("0")
-
-
-if __name__ == '__main__':
-    _main(
-        f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241103_351_1000_1028_1102.txt",
-        f"/Users/zhao/Desktop/tzld/XGB/predict_file/20241104_351_1000_1028_1102.txt",
-        "/Users/zhao/Desktop/tzld/XGB/model_xgb_351_1000_v2_calibration.txt",
-        f"/Users/zhao/Desktop/tzld/XGB/20241104.csv"
-    )

+ 6 - 51
script/t.py

@@ -1,54 +1,9 @@
 import json
-import pandas as pd
 
-feature = ["1_vovh0分子",
-           "2_vovh0分子",
-           "2_vovh1分子",
-           "3_vovh0分子",
-           "3_vovh1分子",
-           "3_vovh2分子",
-           "4_vovh0分子",
-           "4_vovh1分子",
-           "4_vovh3分子",
-           "7_vovh0分子",
-           "7_vovh1分子",
-           "7_vovh6分子",
-           "13_vovh0分子",
-           "13_vovh1分子",
-           "13_vovh12分子",
-           "25_vovh0分子",
-           "25_vovh1分子",
-           "25_vovh24分子",
-           "1_vovd0分子",
-           "2_vovd0分子",
-           "2_vovd1分子",
-           "3_vovd0分子",
-           "3_vovd1分子",
-           "3_vovd2分子",
-           "1_vovh分母",
-           "2_vovh分母",
-           "3_vovh分母",
-           "4_vovh分母",
-           "7_vovh分母",
-           "13_vovh分母",
-           "25_vovh分母",
-           "1_vovd分母",
-           "2_vovd分母",
-           "3_vovd分母", ]
+with open("/Users/zhao/Desktop/1.json", 'r') as reader:
+    j = reader.read()
 
-with open("/Users/zhao/Desktop/1.json", "r") as file:
-    result = json.loads(file.read())
-
-data = []
-for item in result:
-    d = {
-        'score': item['score'],
-        "vovScore": item["scoresMap"]['vovScore'],
-        '小时': item['allFeatureMap']['weightKey']
-    }
-    for f in feature:
-        d[f] = item['allFeatureMap'][f]
-
-    data.append(d)
-df = pd.DataFrame.from_records(data)
-df.to_csv("/Users/zhao/Desktop/1.csv", index=False)
+creative_list = json.loads(j)
+for c in creative_list:
+    for item in c['creativeConfigList']:
+        print(item['creativeCode'])

+ 77 - 0
t.py

@@ -0,0 +1,77 @@
+import numpy as np
+import pandas as pd
+
+from client import ODPSClient
+
+recall_result = "/Users/zhao/Desktop/20241124_recall.csv"
+day_oss = "/Users/zhao/Desktop/20241124_day_oss.csv"
+
+odps = ODPSClient.ODPSClient()
+
+
+def read_day_recall_v2() -> pd.DataFrame:
+    df = pd.read_csv(day_oss)
+    df['cpm_rank'] = df.groupby("type")['score'].rank(method='first', ascending=False).astype(int)
+    df['cpm_rank_2'] = df.groupby("type")['cpm'].rank(method='first', ascending=False).astype(int)
+
+    df['view_rank'] = df.groupby("type")['view_rate'].rank(method='first', ascending=False).astype(int)
+
+    df['day_recall_v2'] = np.where(
+        ((df['type'] == '14d') & ((df['cpm_rank'] <= 30) | (df['cpm_rank_2'] <= 20) | (df['view_rank'] <= 30))) |
+        ((df['type'] == '3d') & ((df['cpm_rank'] <= 50) | (df['cpm_rank_2'] <= 30) | (df['view_rank'] <= 50))) |
+        ((df['type'] == '1d') & ((df['cpm_rank'] <= 80) | (df['cpm_rank_2'] <= 50) | (df['view_rank'] <= 100))),
+        True,
+        False
+    )
+    df.to_csv("/Users/zhao/Desktop/3.csv", index=False)
+
+    grouped_df = (
+        df.groupby('cid', as_index=False)  # 按 CID 分组
+        .agg(day_recall_v2=('day_recall_v2', 'any'))  # 只要有一个为 True,就为 True
+    )
+
+    return grouped_df
+
+
+def read_day_recall() -> pd.DataFrame:
+    df = pd.read_csv(day_oss)
+    df['cpm_rank'] = df.groupby("type")['score'].rank(method='first', ascending=False).astype(int)
+
+    df['view_rank'] = df.groupby("type")['view_rate'].rank(method='first', ascending=False).astype(int)
+
+    df['day_recall_v1'] = np.where(
+        ((df['type'] == '14d') & ((df['cpm_rank'] <= 30) | (df['view_rank'] <= 20))) |
+        ((df['type'] == '3d') & ((df['cpm_rank'] <= 50) | (df['view_rank'] <= 30))) |
+        ((df['type'] == '1d') & ((df['cpm_rank'] <= 80) | (df['view_rank'] <= 50))),
+        True,
+        False
+    )
+
+    df.to_csv("/Users/zhao/Desktop/2.csv", index=False)
+
+    grouped_df = (
+        df.groupby('cid', as_index=False)  # 按 CID 分组
+        .agg(day_recall_v1=('day_recall_v1', 'any'))  # 只要有一个为 True,就为 True
+    )
+
+    return grouped_df
+
+
+def _main():
+    day_recall = read_day_recall()
+
+    day_recall_v2 = read_day_recall_v2()
+
+    recall = pd.read_csv(recall_result)
+    recall['base_diff'] = recall['产品-17-前五组'] - recall['产品-35-前五组']
+
+    recall = (pd.merge(recall, day_recall, on='cid', how='left')
+              .merge(day_recall_v2, on='cid', how='left'))
+
+    recall.to_csv("/Users/zhao/Desktop/1.csv", index=False)
+
+    print(recall)
+
+
+if __name__ == '__main__':
+    _main()

+ 54 - 11
vov/data_download.py

@@ -1,34 +1,32 @@
-from concurrent.futures import ThreadPoolExecutor, as_completed
-from datetime import timedelta
-
 import pandas as pd
-from odps.src.types_c import datetime
 
 from client import ODPSClient
 
 odps_client = ODPSClient.ODPSClient()
 
-dt_list = ["20241108", "20241109", "20241110"]
+dt_list = ["20241115", "20241116"]
 hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
            "18", "19", "20", "21", "22", "23"]
 
 VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
 
+APP_TYPE = "4"
+
 
 # 定义查询函数
 def read_odps(dt: str, hh: str):
     # 读取SQL文件
     sql = ""
-    with open(f"{VOV_BASE_PATH}/sql/vovh24_feature.sql", "r") as f:
+    with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f:
         sql = f.read()
 
     real_sql = (
         sql.replace("${bizdate}", dt)
         .replace("${hh}", hh)
-        .replace("${apptype}", "4")
+        .replace("${apptype}", APP_TYPE)
     )
     print(f"Executing for dt: {dt}, hh: {hh}")
-    odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}_feature.csv")
+    odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
 
 
 # 并行执行函数
@@ -57,15 +55,60 @@ def download():
     for dt in dt_list:
         csv_list = []
         for hh in hh_list:
-            csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}_feature.csv")
+            csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
 
         df_list = [pd.read_csv(file) for file in csv_list]
         df = pd.concat(df_list, ignore_index=True)
-        df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}_feature.csv", index=False)
+        df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
+
+
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+
+def ad_download():
+    # 日期列表
+    date_list = ["20241123", "20241124", "20241125", "20241126", "20241127", "20241128"]
+    # 最大线程数
+    max_workers = 24
+    # SQL 文件路径
+    sql_file_path = "/Users/zhao/Desktop/广告分析.sql"
+    # 线程池
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        # 存储任务
+        future_tasks = {}
+
+        for date in date_list:
+            params = {
+                "bizdate": date,
+                "hh": "23",
+                "hhl": "00",
+                "filter_id": "XXXXXXXX",
+                "apptype": "3,36,6,17"
+            }
+            result_file_path = f"/Users/zhao/Desktop/{date}.csv"
+
+            # 提交任务
+            future = executor.submit(
+                odps_client.execute_sql_file_result_save_fle,
+                sql_file_path,
+                params,
+                result_file_path
+            )
+            future_tasks[future] = (params, result_file_path)
+
+        # 监控任务完成情况
+        for future in as_completed(future_tasks):
+            params, result_file_path = future_tasks[future]
+            try:
+                # 获取任务执行结果
+                future.result()
+                print(f"Completed: {result_file_path} for date {params['bizdate']}")
+            except Exception as exc:
+                print(f"Error: {exc} for date {params['bizdate']}")
 
 
 def _main():
-    download()
+    ad_download()
 
 
 if __name__ == "__main__":