Explorar o código

feat:修改模型评估结果分析脚本

zhaohaipeng hai 7 meses
pai
achega
d657042af4

+ 20 - 0
client/ODPSClient.py

@@ -61,6 +61,26 @@ class ODPSClient(object):
                 result.extend(batch)
         return result
 
+    def process_all_records(self, table: str, partition_spec: str, func: callable) -> None:
+
+        """
+        处理指定表和分区中的所有记录,对每条记录应用指定函数。
+
+        :param table: 表名
+        :param partition_spec: 分区规范
+        :param func: 用于处理每条记录的函数
+        """
+        tunnel = TableTunnel(self.odps)
+
+        download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
+        count = download_session.count
+
+        print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
+        # 使用批量读取,并对每条记录应用 func
+        with download_session.open_record_reader(0, count) as reader:
+            for record in reader:
+                func(record)
+
     def get_table(self, table: str):
         return self.odps.get_table(table)
 

+ 5 - 0
config/config.ini

@@ -8,4 +8,9 @@ config.file = /Users/zhao/.kube/config_prod
 [algorithm.redis]
 host = r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com
 port = 6379
+password = Wqsd@2019
+
+[feature.redis]
+host = r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+port = 6379
 password = Wqsd@2019

+ 6 - 1
config/config_test.ini

@@ -1,6 +1,6 @@
 [feishu]
 model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c
-vov.model.webhook=https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd
+vov.model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd
 
 [k8s]
 config.file = /Users/zhao/.kube/config
@@ -9,3 +9,8 @@ config.file = /Users/zhao/.kube/config
 host = r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com
 port = 6379
 password = Wqsd@2019
+
+[feature.redis]
+host = r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+port = 6479
+password = Wqsd@2019

+ 6 - 1
helper/RedisHelper.py

@@ -11,7 +11,7 @@ class RedisHelper(object):
         self.host = host
         self.port = port
         self.password = password
-        self.redis_conn = redis.Redis(host=self.host, port=self.port, password=password)
+        self.redis_conn = redis.Redis(host=host, port=port, password=password)
 
     def add_number_to_set(self, key: str, number):
         logger.info(f"Redis Set写入: {key} ---> {number}")
@@ -22,12 +22,17 @@ class RedisHelper(object):
         self.redis_conn.expire(key, expire)
 
     def m_get_value(self, keys):
+        logger.info(f"Redis mGet: {keys}")
         self.redis_conn.mget(keys)
 
     def get_value(self, key: str):
         # logger.info(f"Redis Get Value: {key}")
         return self.redis_conn.get(key)
 
+    def delete(self, key: str):
+        logger.info(f"Redis Delete: {key}")
+        self.redis_conn.delete(key)
+
     def m_get_pipeline(self, keys):
         pipeline = self.redis_conn.pipeline()
         for key in keys:

+ 43 - 34
model/model_predict_analyse_20241115.py

@@ -1,3 +1,4 @@
+import argparse
 import gzip
 import os.path
 
@@ -6,7 +7,7 @@ from hdfs import InsecureClient
 
 client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
 
-SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
+SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_model_attachment/score_calibration_file")
 PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
 
 
@@ -58,10 +59,13 @@ def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
     """
     d = {"old": old_df, "new": new_df}
     for key in d:
-        df = d[key][['label', "score"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
-        df = d[key][['label', "score_2"]]
-        df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
+        df = d[key]
+        if 'score' in df.columns:
+            score_df = df[['label', "score"]]
+            score_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
+        if 'score_2' in df.columns:
+            score_2_df = d[key][['label', "score_2"]]
+            score_2_df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
 
 
 def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str):
@@ -70,16 +74,17 @@ def save_full_calibration_file(df: pd.DataFrame, segment_file_path: str):
         with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
             writer.write(df.to_csv(sep="\t", index=False))
     else:
-        df.tocsv(segment_file_path, sep="\t", index=False)
+        df.to_csv(segment_file_path, sep="\t", index=False)
 
 
-def calc_calibration_diff_rate(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]:
+def get_predict_calibration_file(df: pd.DataFrame, predict_basename: str) -> [pd.DataFrame]:
     """
     计算模型分的diff_rate
     """
     agg_df = predict_df_agg(df)
-    condition = 'view > 1000 and diff_rate >= 0.1'
-    save_full_calibration_file(agg_df, f"{PREDICT_CACHE_PATH}/{predict_basename}.txt")
+    agg_df['diff_rate'] = (agg_df['p_score_avg'] / agg_df['t_ctcvr'] - 1).mask(agg_df['t_ctcvr'] == 0, 0)
+    condition = 'view > 1000 and diff_rate >= 0.2'
+    save_full_calibration_file(agg_df, f"{SEGMENT_BASE_PATH}/{predict_basename}.txt")
     calibration = agg_df.query(condition)
     return calibration
 
@@ -97,8 +102,7 @@ def get_predict_basename(predict_path) -> [str]:
 
 def calc_calibration_score2(df: pd.DataFrame, calibration_df: pd.DataFrame) -> [pd.DataFrame]:
     calibration_df = calibration_df[['cid', 'diff_rate']]
-    df = pd.merge(df, calibration_df, on='cid', how='left')
-    df.fullna(0)
+    df = pd.merge(df, calibration_df, on='cid', how='left').fillna(0)
     df['score_2'] = df['score'] / (1 + df['diff_rate'])
     return df
 
@@ -125,34 +129,40 @@ def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, a
     old_df = read_predict_file(old_predict_path)
     new_df = read_predict_file(new_predict_path)
 
-    old_calibration_df = calc_calibration_diff_rate(old_df, get_predict_basename(old_predict_path))
+    old_calibration_df = get_predict_calibration_file(old_df, get_predict_basename(old_predict_path))
     old_df = calc_calibration_score2(old_df, old_calibration_df)
 
-    new_calibration_df = calc_calibration_diff_rate(new_df, get_predict_basename(new_predict_path))
+    new_calibration_df = get_predict_calibration_file(new_df, get_predict_basename(new_predict_path))
     new_df = calc_calibration_score2(new_df, new_calibration_df)
 
+    # 本地保存label、score以及校准后的score,用于计算AUC等信息
+    predict_local_save_for_auc(old_df, new_df)
+
+    # 新模型校准文件保存本地,用于同步OSS
+    new_calibration_df.to_csv(calibration_file, sep="\t", index=False)
+
     old_agg_df = predict_df_agg(old_df)
     new_agg_df = predict_df_agg(new_df)
 
     # 字段重命名,和列过滤
-    old_agg_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
-    new_agg_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
+    old_agg_df.rename(columns={'p_score_avg': 'old_score_avg', 'p_score_2_avg': 'old_score_2_avg'}, inplace=True)
+    new_agg_df.rename(columns={'p_score_avg': 'new_score_avg', 'p_score_2_avg': 'new_score_2_avg'}, inplace=True)
     old_group_df = old_agg_df[['cid', 'view', 'conv', 't_ctcvr', 'old_score_avg', 'old_score_2_avg']]
     new_group_df = new_agg_df[['cid', 'new_score_avg', 'new_score_2_avg']]
     merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
 
     # 计算与真实ctcvr的差异值
-    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(new-true)/true"] = (merged['new_score_avg'] / merged['t_ctcvr'] - 1).mask(merged['t_ctcvr'] == 0, 0)
+    merged["(old-true)/true"] = (merged['old_score_avg'] / merged['t_ctcvr'] - 1).mask(merged['t_ctcvr'] == 0, 0)
 
     # 计算校准后的模型分与ctcvr的差异值
-    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
+    merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['t_ctcvr'] - 1).mask(merged['t_ctcvr'] == 0, 0)
+    merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['t_ctcvr'] - 1).mask(merged['t_ctcvr'] == 0, 0)
 
     # 按照曝光排序,写入本地文件
     merged = merged.sort_values(by=['view'], ascending=False)
     merged = merged[[
-        'cid', 'view', "conv", "true_ctcvr",
+        'cid', 'view', "conv", "t_ctcvr",
         "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
         "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
     ]]
@@ -167,17 +177,16 @@ def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, a
 
 
 if __name__ == '__main__':
-    _main("", "", "", "")
-    # parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
-    # parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
-    # parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
-    # parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
-    # parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
-    # args = parser.parse_args()
-    #
-    # _main(
-    #     old_predict_path=args.old_predict_path,
-    #     new_predict_path=args.new_predict_path,
-    #     calibration_file=args.calibration_file,
-    #     analyse_file=args.analyse_file
-    # )
+    parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
+    parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
+    parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
+    parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
+    parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
+    args = parser.parse_args()
+
+    _main(
+        old_predict_path=args.old_predict_path,
+        new_predict_path=args.new_predict_path,
+        calibration_file=args.calibration_file,
+        analyse_file=args.analyse_file
+    )

+ 2 - 2
vov/data_download.py

@@ -48,8 +48,8 @@ def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None
 
 def ad_download() -> None:
     batch_size = 10000
-    total_records = 7633541
-    max_workers = 24
+    total_records = 6731154
+    max_workers = 100
     sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
 
     def create_task(task_index: int) -> Callable[[], None]: