test_eas_request.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. #
  5. # Copyright © 2025 StrayWarrior <i@straywarrior.com>
  6. #
  7. # Distributed under terms of the MIT license.
  8. from eas_prediction import PredictClient
  9. from eas_prediction import StringRequest
  10. from eas_prediction import TFRequest
  11. from odps import ODPS
  12. import pandas as pd
  13. import numpy as np
  14. from sklearn.metrics import roc_auc_score
  15. import time
  16. import hashlib
  17. import pdb
  18. import sys
  19. ODPS_CONFIG = {
  20. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  21. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  22. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  23. }
  24. sparse_features = [
  25. 'cid', 'adid', 'adverid',
  26. 'region', 'city', 'brand',
  27. 'vid', 'cate1', 'cate2',
  28. "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d",
  29. "user_vid_return_tags_7d", "user_vid_return_tags_14d",
  30. "user_cid_click_list", "user_cid_conver_list",
  31. 'apptype' ,'hour' ,'hour_quarter' ,'root_source_scene',
  32. 'root_source_channel' ,'is_first_layer' ,'title_split' ,'profession',
  33. "user_vid_share_tags_1d", "user_vid_share_tags_14d", "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d",
  34. "creative_type", "user_has_conver_1y"
  35. ]
  36. def get_data():
  37. odps_conf = ODPS_CONFIG
  38. o = ODPS(odps_conf['ACCESSID'], odps_conf['ACCESSKEY'], 'loghubods',
  39. endpoint=odps_conf['ENDPOINT'])
  40. dense_features = open("features_top300.config").readlines()
  41. dense_features = [name.strip().lower() for name in dense_features]
  42. feature_names = ','.join(dense_features + sparse_features)
  43. partitions = "dt in ('20250529')"
  44. sql = f''' SELECT {feature_names},has_conversion
  45. FROM loghubods.ad_easyrec_eval_data_v3_sampled
  46. WHERE {partitions} AND cid = 17869
  47. LIMIT 1000
  48. '''
  49. data_query_hash = hashlib.sha1(sql.encode("utf-8")).hexdigest()[0:8]
  50. cache_path = f'ad_data_cache_{data_query_hash}.parquet'
  51. try:
  52. df = pd.read_parquet(cache_path)
  53. except:
  54. with o.execute_sql(sql).open_reader() as reader:
  55. df = reader.to_pandas()
  56. df.to_parquet(cache_path)
  57. def detect_na_return(col):
  58. if df[col].dtype == 'int64':
  59. return 0
  60. elif df[col].dtype == 'float64':
  61. return 0.0
  62. elif col in dense_features:
  63. return 0.0
  64. elif col in ('has_conversion', 'has_click'):
  65. return 0.0
  66. else:
  67. return ''
  68. def handle_nulls(df):
  69. # 构建填充字典:数值列填0,非数值列填空字符串
  70. fill_dict = {
  71. col: detect_na_return(col) for col in df.columns
  72. }
  73. return df.fillna(fill_dict)
  74. df = handle_nulls(df)
  75. return df
  76. ENDPOINT = '1894469520484605.cn-hangzhou.pai-eas.aliyuncs.com'
  77. TOKEN = 'ODI1MmUxODgzZDc3ODM0ZmQwZWU0YTVjZjdlOWVlMGFlZGJjNTlkYQ=='
  78. SERV_NAME = 'ad_rank_dnn_v11_easyrec'
  79. TOKEN = 'ZmUxOWY5OGYwYmFkZmU0ZGEyM2E4NTFkZjAwNGU0YWNmZTFhYTRhZg=='
  80. SERV_NAME = 'ad_rank_dnn_v11_easyrec_test'
  81. DTYPE_TO_TF_TYPE = {
  82. 'float64': TFRequest.DT_DOUBLE,
  83. 'object': TFRequest.DT_STRING,
  84. 'int64': TFRequest.DT_INT64
  85. }
  86. if __name__ == '__main__':
  87. client = PredictClient(ENDPOINT, SERV_NAME)
  88. client.set_token(TOKEN)
  89. client.init()
  90. df = get_data()
  91. # df = df.query('user_vid_return_tags_3d.str.len() > 1')
  92. # df['user_vid_return_tags_3d'] = ''
  93. # pd.set_option('display.max_rows', None)
  94. df['vid'] = df['vid'].apply(lambda x: int(x))
  95. df['cid'] = df['cid'].apply(lambda x: int(x))
  96. df['adid'] = df['adid'].apply(lambda x: int(x))
  97. df['adverid'] = df['adverid'].apply(lambda x: int(x))
  98. df['user_has_conver_1y'] = df['user_has_conver_1y'].apply(lambda x: int(x))
  99. # print(df[['vid', 'cid', 'adid', 'adverid', 'apptype', 'hour', 'hour_quarter', 'is_first_layer']])
  100. feature_names = df.columns.tolist()
  101. user_features = ['viewall', 'ctr_all', 'ecpm_all', 'ctcvr_all', 'clickall', 'converall', 'region', 'city', 'brand',
  102. "user_vid_return_tags_2h",
  103. "user_vid_return_tags_1d", "user_vid_return_tags_3d",
  104. "user_vid_return_tags_7d", "user_vid_return_tags_14d",
  105. "user_cid_click_list", "user_cid_conver_list"]
  106. req = TFRequest('serving_default')
  107. df = df[:100]
  108. batch_size = len(df)
  109. for name in feature_names:
  110. dtype = str(df[name].dtype)
  111. tf_type = DTYPE_TO_TF_TYPE[dtype]
  112. values = df[name].tolist()
  113. if dtype == 'object':
  114. values = [bytes(x, 'utf-8') for x in values]
  115. req.add_feed(name, [batch_size], tf_type, values)
  116. # for name in feature_names:
  117. # if name in user_features:
  118. # req.add_feed(name, [1], TFRequest.DT_DOUBLE, [0.80])
  119. # else:
  120. # req.add_feed(name, [10], TFRequest.DT_DOUBLE, [0.80] * 10)
  121. req.add_fetch('probs')
  122. if 1:
  123. with open("warmup_widedeep_v12.bin", "wb") as f:
  124. f.write(req.to_string())
  125. # 注意: 开启INPUT_TILE=2的优化之后, 上述特征可以只传一个值
  126. # req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
  127. # req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
  128. # req.add_feed('item_id', [3], TFRequest.DT_STRING,
  129. # ['i0001', 'i0002', 'i0003'])
  130. for x in range(0, 1):
  131. t1 = time.time()
  132. resp = client.predict(req)
  133. t2 = time.time()
  134. # pdb.set_trace()
  135. for x in resp.response.outputs['probs'].float_val:
  136. y = x / (x + (1 - x) / 0.04)
  137. print((x, y))
  138. # print(resp.response.outputs['probs'])
  139. print(f'time: {(t2 - t1) * 1000} ms')