|
@@ -0,0 +1,293 @@
|
|
|
+#! /usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# vim:fenc=utf-8
|
|
|
+#
|
|
|
+# Copyright © 2025 StrayWarrior <i@straywarrior.com>
|
|
|
+
|
|
|
+from eas_prediction import PredictClient
|
|
|
+from eas_prediction import StringRequest
|
|
|
+from eas_prediction import TFRequest
|
|
|
+from odps import ODPS
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+from sklearn.metrics import roc_auc_score
|
|
|
+import time
|
|
|
+import hashlib
|
|
|
+import pdb
|
|
|
+import sys
|
|
|
+from q_plot_tool import draw_figures
|
|
|
+
|
|
|
+ODPS_CONFIG = {
|
|
|
+ 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
|
|
|
+ 'ACCESSID': 'LTAIWYUujJAm7CbH',
|
|
|
+ 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
|
|
|
+}
|
|
|
+
|
|
|
+sparse_features = [
|
|
|
+ 'cid', 'adid', 'adverid',
|
|
|
+ 'region', 'city', 'brand',
|
|
|
+ 'vid', 'cate1', 'cate2',
|
|
|
+ "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d",
|
|
|
+ "user_vid_return_tags_7d", "user_vid_return_tags_14d",
|
|
|
+ "user_cid_click_list", "user_cid_conver_list",
|
|
|
+ 'apptype' ,'hour' ,'hour_quarter' ,'root_source_scene',
|
|
|
+ 'root_source_channel' ,'is_first_layer' ,'title_split' ,'profession',
|
|
|
+ "user_vid_share_tags_1d", "user_vid_share_tags_14d", "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d",
|
|
|
+ "creative_type", "user_has_conver_1y",
|
|
|
+ "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d",
|
|
|
+ "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d",
|
|
|
+ "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d",
|
|
|
+ "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d",
|
|
|
+ "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d",
|
|
|
+ "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d"
|
|
|
+]
|
|
|
+
|
|
|
+int_features = [
|
|
|
+ "user_has_conver_1y",
|
|
|
+ "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d",
|
|
|
+ "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d",
|
|
|
+ "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d",
|
|
|
+ "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d",
|
|
|
+ "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d",
|
|
|
+ "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d"
|
|
|
+]
|
|
|
+
|
|
|
+def get_data():
|
|
|
+ odps_conf = ODPS_CONFIG
|
|
|
+ o = ODPS(odps_conf['ACCESSID'], odps_conf['ACCESSKEY'], 'loghubods',
|
|
|
+ endpoint=odps_conf['ENDPOINT'])
|
|
|
+ dense_features = open("features_top300.config").readlines()
|
|
|
+ dense_features = [name.strip().lower() for name in dense_features]
|
|
|
+ feature_names = ','.join(dense_features + sparse_features)
|
|
|
+
|
|
|
+ partitions = "dt in ('20250620')"
|
|
|
+ sql = f''' SELECT {feature_names},has_conversion
|
|
|
+ FROM loghubods.ad_easyrec_train_realtime_data_v3_sampled_temp
|
|
|
+ WHERE {partitions} AND adverid = '598'
|
|
|
+ '''
|
|
|
+ # AND ts BETWEEN unix_timestamp('2025-05-14 17:40:00') AND unix_timestamp('2025-05-14 18:00:00')
|
|
|
+ data_query_hash = hashlib.sha1(sql.encode("utf-8")).hexdigest()[0:8]
|
|
|
+ cache_path = f'ad_data_cache_{data_query_hash}.parquet'
|
|
|
+
|
|
|
+ try:
|
|
|
+ df = pd.read_parquet(cache_path)
|
|
|
+ except:
|
|
|
+ with o.execute_sql(sql).open_reader() as reader:
|
|
|
+ df = reader.to_pandas()
|
|
|
+ df.to_parquet(cache_path)
|
|
|
+
|
|
|
+ def detect_na_return(col):
|
|
|
+ if str(df[col].dtype) in ('int64', 'float64') or col in int_features:
|
|
|
+ return 0
|
|
|
+ elif col in dense_features:
|
|
|
+ return 0.0
|
|
|
+ elif col in ('has_conversion', 'has_click'):
|
|
|
+ return 0
|
|
|
+ else:
|
|
|
+ return ''
|
|
|
+
|
|
|
+ def handle_nulls(df):
|
|
|
+ # 构建填充字典:数值列填0,非数值列填空字符串
|
|
|
+ fill_dict = {
|
|
|
+ col: detect_na_return(col) for col in df.columns
|
|
|
+ }
|
|
|
+ return df.fillna(fill_dict)
|
|
|
+
|
|
|
+ df = handle_nulls(df)
|
|
|
+
|
|
|
+ return df
|
|
|
+
|
|
|
+
|
|
|
+ENDPOINT = '1894469520484605.cn-hangzhou.pai-eas.aliyuncs.com'
|
|
|
+
|
|
|
+TOKEN = 'ODI1MmUxODgzZDc3ODM0ZmQwZWU0YTVjZjdlOWVlMGFlZGJjNTlkYQ=='
|
|
|
+SERV_NAME = 'ad_rank_dnn_v11_easyrec'
|
|
|
+TOKEN = 'ZmUxOWY5OGYwYmFkZmU0ZGEyM2E4NTFkZjAwNGU0YWNmZTFhYTRhZg=='
|
|
|
+SERV_NAME = 'ad_rank_dnn_v11_easyrec_test'
|
|
|
+
|
|
|
+DTYPE_TO_TF_TYPE = {
|
|
|
+ 'float64': TFRequest.DT_DOUBLE,
|
|
|
+ 'object': TFRequest.DT_STRING,
|
|
|
+ 'int64': TFRequest.DT_INT64
|
|
|
+}
|
|
|
+
|
|
|
+def permutate_feature(df, column):
|
|
|
+ df = df.copy()
|
|
|
+ np.random.shuffle(df[column].values)
|
|
|
+ return df
|
|
|
+
|
|
|
+def clear_feature(df, column):
|
|
|
+ df = df.copy()
|
|
|
+ dense_features = open("features_top300.config").readlines()
|
|
|
+ dense_features = [name.strip().lower() for name in dense_features]
|
|
|
+
|
|
|
+ def detect_na_return(col):
|
|
|
+ if df[col].dtype == 'int64':
|
|
|
+ return 0
|
|
|
+ elif df[col].dtype == 'float64':
|
|
|
+ return 0.0
|
|
|
+ elif col in dense_features:
|
|
|
+ return 0.0
|
|
|
+ elif col in ('has_conversion', 'has_click'):
|
|
|
+ return 0
|
|
|
+ else:
|
|
|
+ return ''
|
|
|
+
|
|
|
+ zero_value = detect_na_return(column)
|
|
|
+ df[column] = zero_value
|
|
|
+ return df
|
|
|
+
|
|
|
+def build_req(df):
|
|
|
+ feature_names = df.columns.tolist()
|
|
|
+ batch_size = len(df)
|
|
|
+ req = TFRequest('serving_default')
|
|
|
+ for name in feature_names:
|
|
|
+ dtype = str(df[name].dtype)
|
|
|
+ tf_type = DTYPE_TO_TF_TYPE[dtype]
|
|
|
+ values = df[name].tolist()
|
|
|
+ if dtype == 'object':
|
|
|
+ values = [bytes(x, 'utf-8') for x in values]
|
|
|
+ req.add_feed(name, [batch_size], tf_type, values)
|
|
|
+ req.add_fetch('probs')
|
|
|
+ return req
|
|
|
+
|
|
|
+def predict_by_batches(df, batch_size = 512):
|
|
|
+ n_samples = len(df)
|
|
|
+ batch_num = (n_samples + batch_size - 1) // batch_size
|
|
|
+ scores = []
|
|
|
+ for i in range(batch_num):
|
|
|
+ sub_df = df[i * batch_size : min(n_samples, (i + 1) * batch_size)]
|
|
|
+ req = build_req(sub_df)
|
|
|
+ resp = client.predict(req)
|
|
|
+ scores.extend([x for x in resp.response.outputs['probs'].float_val])
|
|
|
+ return scores
|
|
|
+
|
|
|
+def permutate_feature_and_predict(df):
|
|
|
+ base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val
|
|
|
+ base_scores = np.array(base_scores)
|
|
|
+ base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
|
|
|
+ label = df['has_conversion']
|
|
|
+ base_auc = roc_auc_score(y_true=label, y_score=base_scores)
|
|
|
+ ctcvr = np.sum(label) / len(label)
|
|
|
+ print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
|
|
|
+
|
|
|
+ feature_to_test = df.columns
|
|
|
+ feature_to_test = ['profession',]
|
|
|
+
|
|
|
+ for column in feature_to_test:
|
|
|
+ new_df = clear_feature(df, column)
|
|
|
+ scores = predict_by_batches(new_df)
|
|
|
+ scores = [x / (x + (1 - x) / 0.04) for x in scores]
|
|
|
+ scores = np.array(scores)
|
|
|
+ avg_score = np.average(scores)
|
|
|
+ avg_abs_diff = np.average(np.abs(scores - base_scores))
|
|
|
+ avg_diff = np.average(scores - base_scores)
|
|
|
+ new_auc = roc_auc_score(y_true=label, y_score=scores)
|
|
|
+ auc_diff = new_auc - base_auc
|
|
|
+ print(f'{column}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
|
|
|
+
|
|
|
+
|
|
|
+def clear_feature_by_prefix_and_predict(df):
|
|
|
+ feature_prefix_list = ["actionstatic","adid","adverid","apptype","b2","b3","b4","b5","b6","b7","b8","brand","cate1","cate2","cid","city","clickall","converall","cpa","creative","ctcvr","ctr","cvr","d1","e1","e2","ecpm","has","hour","incomeall","is","profession","region","root","timediff","title","user","vid","viewall"
|
|
|
+]
|
|
|
+ base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val
|
|
|
+ base_scores = np.array(base_scores)
|
|
|
+ base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
|
|
|
+ label = df['has_conversion']
|
|
|
+ try:
|
|
|
+ base_auc = roc_auc_score(y_true=label, y_score=base_scores)
|
|
|
+ except:
|
|
|
+ base_auc = 0
|
|
|
+ ctcvr = np.sum(label) / len(label)
|
|
|
+ print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
|
|
|
+
|
|
|
+ for prefix in feature_prefix_list:
|
|
|
+ new_df = df
|
|
|
+ columns_to_clear = [col for col in df.columns if col.startswith(prefix)]
|
|
|
+ for column in columns_to_clear:
|
|
|
+ new_df = clear_feature(new_df, column)
|
|
|
+ scores = predict_by_batches(new_df)
|
|
|
+ scores = [x / (x + (1 - x) / 0.04) for x in scores]
|
|
|
+ scores = np.array(scores)
|
|
|
+ avg_score = np.average(scores)
|
|
|
+ avg_abs_diff = np.average(np.abs(scores - base_scores))
|
|
|
+ avg_diff = np.average(scores - base_scores)
|
|
|
+ try:
|
|
|
+ new_auc = roc_auc_score(y_true=label, y_score=scores)
|
|
|
+ except:
|
|
|
+ new_auc = 0
|
|
|
+ auc_diff = new_auc - base_auc
|
|
|
+ print(f'{prefix}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
|
|
|
+
|
|
|
+def clear_feature_and_predict(df):
|
|
|
+ base_scores = predict_by_batches(df)
|
|
|
+ base_scores = np.array(base_scores)
|
|
|
+ # base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
|
|
|
+ # print(base_scores)
|
|
|
+ label = df['has_conversion']
|
|
|
+ ctcvr = np.sum(label) / len(label)
|
|
|
+ try:
|
|
|
+ base_auc = roc_auc_score(y_true=label, y_score=base_scores)
|
|
|
+ except:
|
|
|
+ base_auc = 0
|
|
|
+ print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
|
|
|
+
|
|
|
+ feature_to_test = [x.lower().strip() for x in open('features_top50.config').readlines()]
|
|
|
+ return
|
|
|
+ # feature_to_test = sparse_features
|
|
|
+
|
|
|
+ all_clean_df = df.copy()
|
|
|
+ for column in feature_to_test:
|
|
|
+ all_clean_df = clear_feature(all_clean_df, column)
|
|
|
+ # score = client.predict(build_req(all_clean_df)).response.outputs['probs'].float_val
|
|
|
+ score = predict_by_batches(all_clean_df)
|
|
|
+ score = np.array(score)
|
|
|
+ score = score / (score + (1 - score) / 0.04)
|
|
|
+
|
|
|
+ for column in feature_to_test:
|
|
|
+ new_df = clear_feature(df, column)
|
|
|
+ scores = predict_by_batches(new_df)
|
|
|
+ scores = [x / (x + (1 - x) / 0.04) for x in scores]
|
|
|
+ scores = np.array(scores)
|
|
|
+ avg_score = np.average(scores)
|
|
|
+ avg_abs_diff = np.average(np.abs(scores - base_scores))
|
|
|
+ avg_diff = np.average(scores - base_scores)
|
|
|
+ try:
|
|
|
+ new_auc = roc_auc_score(y_true=label, y_score=scores)
|
|
|
+ except:
|
|
|
+ new_auc = 0
|
|
|
+ auc_diff = new_auc - base_auc
|
|
|
+ print(f'{column:20}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
|
|
|
+ # df_to_draw = pd.DataFrame({
|
|
|
+ # 'score': scores,
|
|
|
+ # 'label': label
|
|
|
+ # })
|
|
|
+ # draw_figures(df_to_draw, column, 0.04,
|
|
|
+ # filename=f'plots/feature_q_plot_{column}.png')
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ client = PredictClient(ENDPOINT, SERV_NAME)
|
|
|
+ client.set_token(TOKEN)
|
|
|
+ client.init()
|
|
|
+
|
|
|
+ df = get_data()
|
|
|
+ # df = df.query('user_vid_return_tags_3d.str.len() > 1')
|
|
|
+ # df['user_vid_return_tags_3d'] = ''
|
|
|
+ # pd.set_option('display.max_rows', None)
|
|
|
+ df['vid'] = df['vid'].apply(lambda x: int(x))
|
|
|
+ df['cid'] = df['cid'].apply(lambda x: int(x))
|
|
|
+ df['adid'] = df['adid'].apply(lambda x: int(x))
|
|
|
+ df['adverid'] = df['adverid'].apply(lambda x: int(x))
|
|
|
+ for feature in int_features:
|
|
|
+ df[feature] = df[feature].apply(lambda x: int(x))
|
|
|
+ if len(df) == 0:
|
|
|
+ print("empty df")
|
|
|
+ sys.exit(0)
|
|
|
+ print(f'df size: {len(df)}')
|
|
|
+
|
|
|
+ # print(df)
|
|
|
+ # print(df[['vid', 'cid', 'adid', 'adverid', 'apptype', 'hour', 'hour_quarter', 'is_first_layer']])
|
|
|
+ # clear_feature_and_predict(df)
|
|
|
+ # permutate_feature_and_predict(df)
|
|
|
+ clear_feature_by_prefix_and_predict(df)
|
|
|
+
|