#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2025 StrayWarrior # # Distributed under terms of the MIT license. from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest from odps import ODPS import pandas as pd import numpy as np from sklearn.metrics import roc_auc_score import time import hashlib import pdb import sys ODPS_CONFIG = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', } sparse_features = [ 'cid', 'adid', 'adverid', 'region', 'city', 'brand', 'vid', 'cate1', 'cate2', "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d", "user_cid_click_list", "user_cid_conver_list", 'apptype' ,'hour' ,'hour_quarter' ,'root_source_scene', 'root_source_channel' ,'is_first_layer' ,'title_split' ,'profession', "user_vid_share_tags_1d", "user_vid_share_tags_14d", "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d", "creative_type", "user_has_conver_1y" ] def get_data(): odps_conf = ODPS_CONFIG o = ODPS(odps_conf['ACCESSID'], odps_conf['ACCESSKEY'], 'loghubods', endpoint=odps_conf['ENDPOINT']) dense_features = open("features_top300.config").readlines() dense_features = [name.strip().lower() for name in dense_features] feature_names = ','.join(dense_features + sparse_features) partitions = "dt in ('20250529')" sql = f''' SELECT {feature_names},has_conversion FROM loghubods.ad_easyrec_eval_data_v3_sampled WHERE {partitions} AND cid = 17869 LIMIT 1000 ''' data_query_hash = hashlib.sha1(sql.encode("utf-8")).hexdigest()[0:8] cache_path = f'ad_data_cache_{data_query_hash}.parquet' try: df = pd.read_parquet(cache_path) except: with o.execute_sql(sql).open_reader() as reader: df = reader.to_pandas() df.to_parquet(cache_path) def detect_na_return(col): if df[col].dtype == 'int64': return 0 elif df[col].dtype == 'float64': return 0.0 elif col in dense_features: return 0.0 elif col in ('has_conversion', 'has_click'): return 0.0 else: return '' def handle_nulls(df): # 构建填充字典:数值列填0,非数值列填空字符串 fill_dict = { col: detect_na_return(col) for col in df.columns } return df.fillna(fill_dict) df = handle_nulls(df) return df ENDPOINT = '1894469520484605.cn-hangzhou.pai-eas.aliyuncs.com' TOKEN = 'ODI1MmUxODgzZDc3ODM0ZmQwZWU0YTVjZjdlOWVlMGFlZGJjNTlkYQ==' SERV_NAME = 'ad_rank_dnn_v11_easyrec' TOKEN = 'ZmUxOWY5OGYwYmFkZmU0ZGEyM2E4NTFkZjAwNGU0YWNmZTFhYTRhZg==' SERV_NAME = 'ad_rank_dnn_v11_easyrec_test' DTYPE_TO_TF_TYPE = { 'float64': TFRequest.DT_DOUBLE, 'object': TFRequest.DT_STRING, 'int64': TFRequest.DT_INT64 } if __name__ == '__main__': client = PredictClient(ENDPOINT, SERV_NAME) client.set_token(TOKEN) client.init() df = get_data() # df = df.query('user_vid_return_tags_3d.str.len() > 1') # df['user_vid_return_tags_3d'] = '' # pd.set_option('display.max_rows', None) df['vid'] = df['vid'].apply(lambda x: int(x)) df['cid'] = df['cid'].apply(lambda x: int(x)) df['adid'] = df['adid'].apply(lambda x: int(x)) df['adverid'] = df['adverid'].apply(lambda x: int(x)) df['user_has_conver_1y'] = df['user_has_conver_1y'].apply(lambda x: int(x)) # print(df[['vid', 'cid', 'adid', 'adverid', 'apptype', 'hour', 'hour_quarter', 'is_first_layer']]) feature_names = df.columns.tolist() user_features = ['viewall', 'ctr_all', 'ecpm_all', 'ctcvr_all', 'clickall', 'converall', 'region', 'city', 'brand', "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d", "user_cid_click_list", "user_cid_conver_list"] req = TFRequest('serving_default') df = df[:100] batch_size = len(df) for name in feature_names: dtype = str(df[name].dtype) tf_type = DTYPE_TO_TF_TYPE[dtype] values = df[name].tolist() if dtype == 'object': values = [bytes(x, 'utf-8') for x in values] req.add_feed(name, [batch_size], tf_type, values) # for name in feature_names: # if name in user_features: # req.add_feed(name, [1], TFRequest.DT_DOUBLE, [0.80]) # else: # req.add_feed(name, [10], TFRequest.DT_DOUBLE, [0.80] * 10) req.add_fetch('probs') if 1: with open("warmup_widedeep_v12.bin", "wb") as f: f.write(req.to_string()) # 注意: 开启INPUT_TILE=2的优化之后, 上述特征可以只传一个值 # req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001']) # req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0]) # req.add_feed('item_id', [3], TFRequest.DT_STRING, # ['i0001', 'i0002', 'i0003']) for x in range(0, 1): t1 = time.time() resp = client.predict(req) t2 = time.time() # pdb.set_trace() for x in resp.response.outputs['probs'].float_val: y = x / (x + (1 - x) / 0.04) print((x, y)) # print(resp.response.outputs['probs']) print(f'time: {(t2 - t1) * 1000} ms')