#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2025 StrayWarrior from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest from odps import ODPS import pandas as pd import numpy as np from sklearn.metrics import roc_auc_score import time import hashlib import pdb import sys from q_plot_tool import draw_figures ODPS_CONFIG = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', } sparse_features = [ 'cid', 'adid', 'adverid', 'region', 'city', 'brand', 'vid', 'cate1', 'cate2', "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d", "user_cid_click_list", "user_cid_conver_list", 'apptype' ,'hour' ,'hour_quarter' ,'root_source_scene', 'root_source_channel' ,'is_first_layer' ,'title_split' ,'profession', "user_vid_share_tags_1d", "user_vid_share_tags_14d", "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d", "creative_type", "user_has_conver_1y", "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d", "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d", "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d", "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d", "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d", "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d" ] int_features = [ "user_has_conver_1y", "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d", "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d", "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d", "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d", "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d", "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d" ] def get_data(): odps_conf = ODPS_CONFIG o = ODPS(odps_conf['ACCESSID'], odps_conf['ACCESSKEY'], 'loghubods', endpoint=odps_conf['ENDPOINT']) dense_features = open("features_top300.config").readlines() dense_features = [name.strip().lower() for name in dense_features] feature_names = ','.join(dense_features + sparse_features) partitions = "dt in ('20250620')" sql = f''' SELECT {feature_names},has_conversion FROM loghubods.ad_easyrec_train_realtime_data_v3_sampled_temp WHERE {partitions} AND adverid = '598' ''' # AND ts BETWEEN unix_timestamp('2025-05-14 17:40:00') AND unix_timestamp('2025-05-14 18:00:00') data_query_hash = hashlib.sha1(sql.encode("utf-8")).hexdigest()[0:8] cache_path = f'ad_data_cache_{data_query_hash}.parquet' try: df = pd.read_parquet(cache_path) except: with o.execute_sql(sql).open_reader() as reader: df = reader.to_pandas() df.to_parquet(cache_path) def detect_na_return(col): if str(df[col].dtype) in ('int64', 'float64') or col in int_features: return 0 elif col in dense_features: return 0.0 elif col in ('has_conversion', 'has_click'): return 0 else: return '' def handle_nulls(df): # 构建填充字典:数值列填0,非数值列填空字符串 fill_dict = { col: detect_na_return(col) for col in df.columns } return df.fillna(fill_dict) df = handle_nulls(df) return df ENDPOINT = '1894469520484605.cn-hangzhou.pai-eas.aliyuncs.com' TOKEN = 'ODI1MmUxODgzZDc3ODM0ZmQwZWU0YTVjZjdlOWVlMGFlZGJjNTlkYQ==' SERV_NAME = 'ad_rank_dnn_v11_easyrec' TOKEN = 'ZmUxOWY5OGYwYmFkZmU0ZGEyM2E4NTFkZjAwNGU0YWNmZTFhYTRhZg==' SERV_NAME = 'ad_rank_dnn_v11_easyrec_test' DTYPE_TO_TF_TYPE = { 'float64': TFRequest.DT_DOUBLE, 'object': TFRequest.DT_STRING, 'int64': TFRequest.DT_INT64 } def permutate_feature(df, column): df = df.copy() np.random.shuffle(df[column].values) return df def clear_feature(df, column): df = df.copy() dense_features = open("features_top300.config").readlines() dense_features = [name.strip().lower() for name in dense_features] def detect_na_return(col): if df[col].dtype == 'int64': return 0 elif df[col].dtype == 'float64': return 0.0 elif col in dense_features: return 0.0 elif col in ('has_conversion', 'has_click'): return 0 else: return '' zero_value = detect_na_return(column) df[column] = zero_value return df def build_req(df): feature_names = df.columns.tolist() batch_size = len(df) req = TFRequest('serving_default') for name in feature_names: dtype = str(df[name].dtype) tf_type = DTYPE_TO_TF_TYPE[dtype] values = df[name].tolist() if dtype == 'object': values = [bytes(x, 'utf-8') for x in values] req.add_feed(name, [batch_size], tf_type, values) req.add_fetch('probs') return req def predict_by_batches(df, batch_size = 512): n_samples = len(df) batch_num = (n_samples + batch_size - 1) // batch_size scores = [] for i in range(batch_num): sub_df = df[i * batch_size : min(n_samples, (i + 1) * batch_size)] req = build_req(sub_df) resp = client.predict(req) scores.extend([x for x in resp.response.outputs['probs'].float_val]) return scores def permutate_feature_and_predict(df): base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val base_scores = np.array(base_scores) base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04) label = df['has_conversion'] base_auc = roc_auc_score(y_true=label, y_score=base_scores) ctcvr = np.sum(label) / len(label) print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}') feature_to_test = df.columns feature_to_test = ['profession',] for column in feature_to_test: new_df = clear_feature(df, column) scores = predict_by_batches(new_df) scores = [x / (x + (1 - x) / 0.04) for x in scores] scores = np.array(scores) avg_score = np.average(scores) avg_abs_diff = np.average(np.abs(scores - base_scores)) avg_diff = np.average(scores - base_scores) new_auc = roc_auc_score(y_true=label, y_score=scores) auc_diff = new_auc - base_auc print(f'{column}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}') def clear_feature_by_prefix_and_predict(df): feature_prefix_list = ["actionstatic","adid","adverid","apptype","b2","b3","b4","b5","b6","b7","b8","brand","cate1","cate2","cid","city","clickall","converall","cpa","creative","ctcvr","ctr","cvr","d1","e1","e2","ecpm","has","hour","incomeall","is","profession","region","root","timediff","title","user","vid","viewall" ] base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val base_scores = np.array(base_scores) base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04) label = df['has_conversion'] try: base_auc = roc_auc_score(y_true=label, y_score=base_scores) except: base_auc = 0 ctcvr = np.sum(label) / len(label) print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}') for prefix in feature_prefix_list: new_df = df columns_to_clear = [col for col in df.columns if col.startswith(prefix)] for column in columns_to_clear: new_df = clear_feature(new_df, column) scores = predict_by_batches(new_df) scores = [x / (x + (1 - x) / 0.04) for x in scores] scores = np.array(scores) avg_score = np.average(scores) avg_abs_diff = np.average(np.abs(scores - base_scores)) avg_diff = np.average(scores - base_scores) try: new_auc = roc_auc_score(y_true=label, y_score=scores) except: new_auc = 0 auc_diff = new_auc - base_auc print(f'{prefix}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}') def clear_feature_and_predict(df): base_scores = predict_by_batches(df) base_scores = np.array(base_scores) # base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04) # print(base_scores) label = df['has_conversion'] ctcvr = np.sum(label) / len(label) try: base_auc = roc_auc_score(y_true=label, y_score=base_scores) except: base_auc = 0 print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}') feature_to_test = [x.lower().strip() for x in open('features_top50.config').readlines()] return # feature_to_test = sparse_features all_clean_df = df.copy() for column in feature_to_test: all_clean_df = clear_feature(all_clean_df, column) # score = client.predict(build_req(all_clean_df)).response.outputs['probs'].float_val score = predict_by_batches(all_clean_df) score = np.array(score) score = score / (score + (1 - score) / 0.04) for column in feature_to_test: new_df = clear_feature(df, column) scores = predict_by_batches(new_df) scores = [x / (x + (1 - x) / 0.04) for x in scores] scores = np.array(scores) avg_score = np.average(scores) avg_abs_diff = np.average(np.abs(scores - base_scores)) avg_diff = np.average(scores - base_scores) try: new_auc = roc_auc_score(y_true=label, y_score=scores) except: new_auc = 0 auc_diff = new_auc - base_auc print(f'{column:20}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}') # df_to_draw = pd.DataFrame({ # 'score': scores, # 'label': label # }) # draw_figures(df_to_draw, column, 0.04, # filename=f'plots/feature_q_plot_{column}.png') if __name__ == '__main__': client = PredictClient(ENDPOINT, SERV_NAME) client.set_token(TOKEN) client.init() df = get_data() # df = df.query('user_vid_return_tags_3d.str.len() > 1') # df['user_vid_return_tags_3d'] = '' # pd.set_option('display.max_rows', None) df['vid'] = df['vid'].apply(lambda x: int(x)) df['cid'] = df['cid'].apply(lambda x: int(x)) df['adid'] = df['adid'].apply(lambda x: int(x)) df['adverid'] = df['adverid'].apply(lambda x: int(x)) for feature in int_features: df[feature] = df[feature].apply(lambda x: int(x)) if len(df) == 0: print("empty df") sys.exit(0) print(f'df size: {len(df)}') # print(df) # print(df[['vid', 'cid', 'adid', 'adverid', 'apptype', 'hour', 'hour_quarter', 'is_first_layer']]) # clear_feature_and_predict(df) # permutate_feature_and_predict(df) clear_feature_by_prefix_and_predict(df)