inspect_features.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. #
  5. # Copyright © 2025 StrayWarrior <i@straywarrior.com>
  6. from eas_prediction import PredictClient
  7. from eas_prediction import StringRequest
  8. from eas_prediction import TFRequest
  9. from odps import ODPS
  10. import pandas as pd
  11. import numpy as np
  12. from sklearn.metrics import roc_auc_score
  13. import time
  14. import hashlib
  15. import pdb
  16. import sys
  17. from q_plot_tool import draw_figures
  18. ODPS_CONFIG = {
  19. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  20. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  21. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  22. }
  23. sparse_features = [
  24. 'cid', 'adid', 'adverid',
  25. 'region', 'city', 'brand',
  26. 'vid', 'cate1', 'cate2',
  27. "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d",
  28. "user_vid_return_tags_7d", "user_vid_return_tags_14d",
  29. "user_cid_click_list", "user_cid_conver_list",
  30. 'apptype' ,'hour' ,'hour_quarter' ,'root_source_scene',
  31. 'root_source_channel' ,'is_first_layer' ,'title_split' ,'profession',
  32. "user_vid_share_tags_1d", "user_vid_share_tags_14d", "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d",
  33. "creative_type", "user_has_conver_1y",
  34. "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d",
  35. "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d",
  36. "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d",
  37. "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d",
  38. "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d",
  39. "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d",
  40. "user_conver_ad_class"
  41. ]
  42. int_features = [
  43. "user_has_conver_1y",
  44. "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d",
  45. "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d",
  46. "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d",
  47. "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d",
  48. "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d",
  49. "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d"
  50. ]
  51. def get_data():
  52. odps_conf = ODPS_CONFIG
  53. o = ODPS(odps_conf['ACCESSID'], odps_conf['ACCESSKEY'], 'loghubods',
  54. endpoint=odps_conf['ENDPOINT'])
  55. dense_features = open("features_top300.config").readlines()
  56. dense_features = [name.strip().lower() for name in dense_features]
  57. feature_names = ','.join(dense_features + sparse_features)
  58. partitions = "dt in ('20250709')"
  59. sql = f''' SELECT {feature_names},has_conversion
  60. FROM loghubods.ad_easyrec_train_realtime_data_v3_sampled_temp
  61. WHERE {partitions} AND adverid = '523'
  62. '''
  63. # AND ts BETWEEN unix_timestamp('2025-05-14 17:40:00') AND unix_timestamp('2025-05-14 18:00:00')
  64. data_query_hash = hashlib.sha1(sql.encode("utf-8")).hexdigest()[0:8]
  65. cache_path = f'ad_data_cache_{data_query_hash}.parquet'
  66. try:
  67. df = pd.read_parquet(cache_path)
  68. except:
  69. with o.execute_sql(sql).open_reader() as reader:
  70. df = reader.to_pandas()
  71. df.to_parquet(cache_path)
  72. def detect_na_return(col):
  73. if str(df[col].dtype) in ('int64', 'float64') or col in int_features:
  74. return 0
  75. elif col in dense_features:
  76. return 0.0
  77. elif col in ('has_conversion', 'has_click'):
  78. return 0
  79. else:
  80. return ''
  81. def handle_nulls(df):
  82. # 构建填充字典:数值列填0,非数值列填空字符串
  83. fill_dict = {
  84. col: detect_na_return(col) for col in df.columns
  85. }
  86. return df.fillna(fill_dict)
  87. df = handle_nulls(df)
  88. return df
  89. ENDPOINT = '1894469520484605.cn-hangzhou.pai-eas.aliyuncs.com'
  90. TOKEN = 'ODI1MmUxODgzZDc3ODM0ZmQwZWU0YTVjZjdlOWVlMGFlZGJjNTlkYQ=='
  91. SERV_NAME = 'ad_rank_dnn_v11_easyrec'
  92. TOKEN = 'ZmUxOWY5OGYwYmFkZmU0ZGEyM2E4NTFkZjAwNGU0YWNmZTFhYTRhZg=='
  93. SERV_NAME = 'ad_rank_dnn_v11_easyrec_test'
  94. DTYPE_TO_TF_TYPE = {
  95. 'float64': TFRequest.DT_DOUBLE,
  96. 'object': TFRequest.DT_STRING,
  97. 'int64': TFRequest.DT_INT64
  98. }
  99. def permutate_feature(df, column):
  100. df = df.copy()
  101. np.random.shuffle(df[column].values)
  102. return df
  103. def clear_feature(df, column):
  104. df = df.copy()
  105. dense_features = open("features_top300.config").readlines()
  106. dense_features = [name.strip().lower() for name in dense_features]
  107. def detect_na_return(col):
  108. if df[col].dtype == 'int64':
  109. return 0
  110. elif df[col].dtype == 'float64':
  111. return 0.0
  112. elif col in dense_features:
  113. return 0.0
  114. elif col in ('has_conversion', 'has_click'):
  115. return 0
  116. else:
  117. return ''
  118. zero_value = detect_na_return(column)
  119. df[column] = zero_value
  120. return df
  121. def build_req(df, save_req=None):
  122. feature_names = df.columns.tolist()
  123. batch_size = len(df)
  124. req = TFRequest('serving_default')
  125. for name in feature_names:
  126. dtype = str(df[name].dtype)
  127. tf_type = DTYPE_TO_TF_TYPE[dtype]
  128. values = df[name].tolist()
  129. if dtype == 'object':
  130. values = [bytes(x, 'utf-8') for x in values]
  131. req.add_feed(name, [batch_size], tf_type, values)
  132. req.add_fetch('probs')
  133. if save_req:
  134. with open(save_req, "wb") as f:
  135. f.write(req.to_string())
  136. return req
  137. def predict_by_batches(df, batch_size = 512):
  138. n_samples = len(df)
  139. batch_num = (n_samples + batch_size - 1) // batch_size
  140. scores = []
  141. for i in range(batch_num):
  142. sub_df = df[i * batch_size : min(n_samples, (i + 1) * batch_size)]
  143. req = build_req(sub_df)
  144. resp = client.predict(req)
  145. scores.extend([x for x in resp.response.outputs['probs'].float_val])
  146. return scores
  147. def permutate_feature_and_predict(df):
  148. base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val
  149. base_scores = np.array(base_scores)
  150. base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
  151. label = df['has_conversion']
  152. base_auc = roc_auc_score(y_true=label, y_score=base_scores)
  153. ctcvr = np.sum(label) / len(label)
  154. print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
  155. feature_to_test = df.columns
  156. feature_to_test = ['profession',]
  157. for column in feature_to_test:
  158. new_df = clear_feature(df, column)
  159. scores = predict_by_batches(new_df)
  160. scores = [x / (x + (1 - x) / 0.04) for x in scores]
  161. scores = np.array(scores)
  162. avg_score = np.average(scores)
  163. avg_abs_diff = np.average(np.abs(scores - base_scores))
  164. avg_diff = np.average(scores - base_scores)
  165. new_auc = roc_auc_score(y_true=label, y_score=scores)
  166. auc_diff = new_auc - base_auc
  167. print(f'{column}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
  168. def clear_feature_by_prefix_and_predict(df):
  169. feature_prefix_list = [
  170. # "actionstatic","adid","adverid","apptype","b2","b3","b4","b5","b6","b7","b8","brand","cate1","cate2","cid","city","clickall","converall","cpa","creative","ctcvr","ctr","cvr","d1","e1","e2","ecpm","has","hour","incomeall","is","profession","region","root","timediff","title","user","vid","viewall",
  171. "user_conver_ad_class"
  172. ]
  173. base_scores = client.predict(build_req(df)).response.outputs['probs'].float_val
  174. base_scores = np.array(base_scores)
  175. base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
  176. label = df['has_conversion']
  177. try:
  178. base_auc = roc_auc_score(y_true=label, y_score=base_scores)
  179. except:
  180. base_auc = 0
  181. ctcvr = np.sum(label) / len(label)
  182. print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
  183. for prefix in feature_prefix_list:
  184. new_df = df
  185. columns_to_clear = [col for col in df.columns if col.startswith(prefix)]
  186. for column in columns_to_clear:
  187. new_df = clear_feature(new_df, column)
  188. scores = predict_by_batches(new_df)
  189. scores = [x / (x + (1 - x) / 0.04) for x in scores]
  190. scores = np.array(scores)
  191. avg_score = np.average(scores)
  192. avg_abs_diff = np.average(np.abs(scores - base_scores))
  193. avg_diff = np.average(scores - base_scores)
  194. try:
  195. new_auc = roc_auc_score(y_true=label, y_score=scores)
  196. except:
  197. new_auc = 0
  198. auc_diff = new_auc - base_auc
  199. print(f'{prefix}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
  200. def clear_feature_and_predict(df):
  201. base_scores = predict_by_batches(df)
  202. base_scores = np.array(base_scores)
  203. # base_scores = base_scores / (base_scores + (1 - base_scores) / 0.04)
  204. # print(base_scores)
  205. label = df['has_conversion']
  206. ctcvr = np.sum(label) / len(label)
  207. try:
  208. base_auc = roc_auc_score(y_true=label, y_score=base_scores)
  209. except:
  210. base_auc = 0
  211. print(f'avg base score: {np.average(base_scores):.6f}, auc: {base_auc:.6f}, ctcvr: {ctcvr:.6f}')
  212. feature_to_test = [x.lower().strip() for x in open('features_top50.config').readlines()]
  213. return
  214. # feature_to_test = sparse_features
  215. all_clean_df = df.copy()
  216. for column in feature_to_test:
  217. all_clean_df = clear_feature(all_clean_df, column)
  218. # score = client.predict(build_req(all_clean_df)).response.outputs['probs'].float_val
  219. score = predict_by_batches(all_clean_df)
  220. score = np.array(score)
  221. score = score / (score + (1 - score) / 0.04)
  222. for column in feature_to_test:
  223. new_df = clear_feature(df, column)
  224. scores = predict_by_batches(new_df)
  225. scores = [x / (x + (1 - x) / 0.04) for x in scores]
  226. scores = np.array(scores)
  227. avg_score = np.average(scores)
  228. avg_abs_diff = np.average(np.abs(scores - base_scores))
  229. avg_diff = np.average(scores - base_scores)
  230. try:
  231. new_auc = roc_auc_score(y_true=label, y_score=scores)
  232. except:
  233. new_auc = 0
  234. auc_diff = new_auc - base_auc
  235. print(f'{column:20}\t{avg_score:.6f}\t{avg_diff:.6f}\t{avg_abs_diff:.6f}\t{auc_diff:.6f}')
  236. # df_to_draw = pd.DataFrame({
  237. # 'score': scores,
  238. # 'label': label
  239. # })
  240. # draw_figures(df_to_draw, column, 0.04,
  241. # filename=f'plots/feature_q_plot_{column}.png')
  242. if __name__ == '__main__':
  243. client = PredictClient(ENDPOINT, SERV_NAME)
  244. client.set_token(TOKEN)
  245. client.init()
  246. df = get_data()
  247. # df = df.query('user_vid_return_tags_3d.str.len() > 1')
  248. # df['user_vid_return_tags_3d'] = ''
  249. # pd.set_option('display.max_rows', None)
  250. df['vid'] = df['vid'].apply(lambda x: int(x))
  251. df['cid'] = df['cid'].apply(lambda x: int(x))
  252. df['adid'] = df['adid'].apply(lambda x: int(x))
  253. df['adverid'] = df['adverid'].apply(lambda x: int(x))
  254. for feature in int_features:
  255. df[feature] = df[feature].apply(lambda x: int(x))
  256. if len(df) == 0:
  257. print("empty df")
  258. sys.exit(0)
  259. print(f'df size: {len(df)}')
  260. # print(df)
  261. # print(df[['vid', 'cid', 'adid', 'adverid', 'apptype', 'hour', 'hour_quarter', 'is_first_layer']])
  262. # clear_feature_and_predict(df)
  263. # permutate_feature_and_predict(df)
  264. # clear_feature_by_prefix_and_predict(df)
  265. # scores = client.predict(build_req(df, 'warmup_widedeep_v12.bin')).response.outputs['probs'].float_val