#coding utf-8 from odps import ODPS import pandas as pd from collections import defaultdict from tqdm import tqdm import sys import requests import json import datetime import traceback from threading import Timer from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu from config import set_config from log import Log config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() from feature import get_item_features from lr_model import LrModel odps = ODPS( access_id='LTAIWYUujJAm7CbH', secret_access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', project="loghubods", endpoint='http://service.cn.maxcompute.aliyun.com/api') def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ :param pool_connections: :return: records """ records = odps.read_table(name=table, partition='dt=%s' % date) return records def exe_sql(sql): data = [] with odps.execute_sql(sql).open_reader() as reader: d = defaultdict(list) # for record in reader: for res in record: d[res[0]].append(res[1]) # #data = pd.DataFrame.from_dict(d, orient='index', dtype=str).T # data = pd.DataFrame.from_dict(d, orient='columns', dtype=str) # return data if __name__=="__main__": project = 'loghubods' datetime = sys.argv[1] sql = """ --odps sql --********************************************************************-- --author:研发 --create time:2023-12-01 15:48:17 --********************************************************************-- with candidate as ( select -- 基础特征_用户 mid AS u_id ,machineinfo_brand AS u_brand ,machineinfo_model AS u_device ,SPLIT(machineinfo_system,' ')[0] AS u_system ,machineinfo_system AS u_system_ver -- 基础特征_视频 ,videoid AS i_id ,i_up_id AS i_up_id ,tags as i_tag ,title as i_title ,ceil(log2(i_title_len + 1)) as i_title_len ,ceil(log2(total_time + 1)) as i_play_len ,ceil(log2(i_days_since_upload + 1)) as i_days_since_upload -- 发布时间(距离现在天数) -- 基础特征_场景 ,ctx_day AS ctx_day ,ctx_week AS ctx_week ,ctx_hour AS ctx_hour ,ctx_region as ctx_region ,ctx_city as ctx_city -- 基础特征_交叉 ,ui_is_out as ui_is_out ,i_play_len as playtime -- ,IF(i_play_len > 1,'0','1') AS ui_is_out_new ,rootmid AS ui_root_id ,shareid AS ui_share_id -- 统计特征_用户 ,ceil(log2(u_1day_exp_cnt + 1)) as u_1day_exp_cnt ,ceil(log2(u_1day_click_cnt + 1)) as u_1day_click_cnt ,ceil(log2(u_1day_share_cnt + 1)) as u_1day_share_cnt ,ceil(log2(u_1day_return_cnt + 1)) as u_1day_return_cnt ,ceil(log2(u_3day_exp_cnt + 1)) as u_3day_exp_cnt ,ceil(log2(u_3day_click_cnt + 1)) as u_3day_click_cnt ,ceil(log2(u_3day_share_cnt + 1)) as u_3day_share_cnt ,ceil(log2(u_3day_return_cnt + 1)) as u_3day_return_cnt ,ceil(log2(u_7day_exp_cnt + 1)) as u_7day_exp_cnt ,ceil(log2(u_7day_click_cnt + 1)) as u_7day_click_cnt ,ceil(log2(u_7day_share_cnt + 1)) as u_7day_share_cnt ,ceil(log2(u_7day_return_cnt + 1)) as u_7day_return_cnt ,ceil(log2(u_3month_exp_cnt + 1)) as u_3month_exp_cnt ,ceil(log2(u_3month_click_cnt + 1)) as u_3month_click_cnt ,ceil(log2(u_3month_share_cnt + 1)) as u_3month_share_cnt ,ceil(log2(u_3month_return_cnt + 1)) as u_3month_return_cnt ,round(if(u_ctr_1day > 10.0, 10.0, u_ctr_1day) / 10.0, 6) as u_ctr_1day ,round(if(u_str_1day > 10.0, 10.0, u_str_1day) / 10.0, 6) as u_str_1day ,round(if(u_rov_1day > 10.0, 10.0, u_rov_1day) / 10.0, 6) as u_rov_1day ,round(if(u_ros_1day > 10.0, 10.0, u_ros_1day) / 10.0, 6) as u_ros_1day ,round(if(u_ctr_3day > 10.0, 10.0, u_ctr_3day) / 10.0, 6) as u_ctr_3day ,round(if(u_str_3day > 10.0, 10.0, u_str_3day) / 10.0, 6) as u_str_3day ,round(if(u_rov_3day > 10.0, 10.0, u_rov_3day) / 10.0, 6) as u_rov_3day ,round(if(u_ros_3day > 10.0, 10.0, u_ros_3day) / 10.0, 6) as u_ros_3day ,round(if(u_ctr_7day > 10.0, 10.0, u_ctr_7day) / 10.0, 6) as u_ctr_7day ,round(if(u_str_7day > 10.0, 10.0, u_str_7day) / 10.0, 6) as u_str_7day ,round(if(u_rov_7day > 10.0, 10.0, u_rov_7day) / 10.0, 6) as u_rov_7day ,round(if(u_ros_7day > 10.0, 10.0, u_ros_7day) / 10.0, 6) as u_ros_7day ,round(if(u_ctr_3month > 10.0, 10.0, u_ctr_3month) / 10.0, 6) as u_ctr_3month ,round(if(u_str_3month > 10.0, 10.0, u_str_3month) / 10.0, 6) as u_str_3month ,round(if(u_rov_3month > 10.0, 10.0, u_rov_3month) / 10.0, 6) as u_rov_3month ,round(if(u_ros_3month > 10.0, 10.0, u_ros_3month) / 10.0, 6) as u_ros_3month -- 统计特征_视频 ,ceil(log2(i_1day_exp_cnt + 1)) as i_1day_exp_cnt ,ceil(log2(i_1day_click_cnt + 1)) as i_1day_click_cnt ,ceil(log2(i_1day_share_cnt + 1)) as i_1day_share_cnt ,ceil(log2(i_1day_return_cnt + 1)) as i_1day_return_cnt ,ceil(log2(i_3day_exp_cnt + 1)) as i_3day_exp_cnt ,ceil(log2(i_3day_click_cnt + 1)) as i_3day_click_cnt ,ceil(log2(i_3day_share_cnt + 1)) as i_3day_share_cnt ,ceil(log2(i_3day_return_cnt + 1)) as i_3day_return_cnt ,ceil(log2(i_7day_exp_cnt + 1)) as i_7day_exp_cnt ,ceil(log2(i_7day_click_cnt + 1)) as i_7day_click_cnt ,ceil(log2(i_7day_share_cnt + 1)) as i_7day_share_cnt ,ceil(log2(i_7day_return_cnt + 1)) as i_7day_return_cnt ,ceil(log2(i_3month_exp_cnt + 1)) as i_3month_exp_cnt ,ceil(log2(i_3month_click_cnt + 1)) as i_3month_click_cnt ,ceil(log2(i_3month_share_cnt + 1)) as i_3month_share_cnt ,ceil(log2(i_3month_return_cnt + 1)) as i_3month_return_cnt ,round(if(i_ctr_1day > 10.0, 10.0, i_ctr_1day) / 10.0, 6) as i_ctr_1day ,round(if(i_str_1day > 10.0, 10.0, i_str_1day) / 10.0, 6) as i_str_1day ,round(if(i_rov_1day > 10.0, 10.0, i_rov_1day) / 10.0, 6) as i_rov_1day ,round(if(i_ros_1day > 10.0, 10.0, i_ros_1day) / 10.0, 6) as i_ros_1day ,round(if(i_ctr_3day > 10.0, 10.0, i_ctr_3day) / 10.0, 6) as i_ctr_3day ,round(if(i_str_3day > 10.0, 10.0, i_str_3day) / 10.0, 6) as i_str_3day ,round(if(i_rov_3day > 10.0, 10.0, i_rov_3day) / 10.0, 6) as i_rov_3day ,round(if(i_ros_3day > 10.0, 10.0, i_ros_3day) / 10.0, 6) as i_ros_3day ,round(if(i_ctr_7day > 10.0, 10.0, i_ctr_7day) / 10.0, 6) as i_ctr_7day ,round(if(i_str_7day > 10.0, 10.0, i_str_7day) / 10.0, 6) as i_str_7day ,round(if(i_rov_7day > 10.0, 10.0, i_rov_7day) / 10.0, 6) as i_rov_7day ,round(if(i_ros_7day > 10.0, 10.0, i_ros_7day) / 10.0, 6) as i_ros_7day ,round(if(i_ctr_3month > 10.0, 10.0, i_ctr_3month) / 10.0, 6) as i_ctr_3month ,round(if(i_str_3month > 10.0, 10.0, i_str_3month) / 10.0, 6) as i_str_3month ,round(if(i_rov_3month > 10.0, 10.0, i_rov_3month) / 10.0, 6) as i_rov_3month ,round(if(i_ros_3month > 10.0, 10.0, i_ros_3month) / 10.0, 6) as i_ros_3month from user_video_features_data_final where dt='{datetime}' and ad_ornot = '0' and apptype != '13' ), candidate_user as ( SELECT u_id, max(u_1day_exp_cnt) as u_1day_exp_cnt, max(u_1day_click_cnt) as u_1day_click_cnt, max(u_1day_share_cnt) as u_1day_share_cnt, max(u_1day_return_cnt) as u_1day_return_cnt, max(u_3day_exp_cnt) as u_3day_exp_cnt, max(u_3day_click_cnt) as u_3day_click_cnt, max(u_3day_share_cnt) as u_3day_share_cnt, max(u_3day_return_cnt) as u_3day_return_cnt, max(u_7day_exp_cnt) as u_7day_exp_cnt, max(u_7day_click_cnt) as u_7day_click_cnt, max(u_7day_share_cnt) as u_7day_share_cnt, max(u_7day_return_cnt) as u_7day_return_cnt, max(u_3month_exp_cnt) as u_3month_exp_cnt, max(u_3month_click_cnt) as u_3month_click_cnt, max(u_3month_share_cnt) as u_3month_share_cnt, max(u_3month_return_cnt) as u_3month_return_cnt, max(u_ctr_1day) as u_ctr_1day, max(u_str_1day) as u_str_1day, max(u_rov_1day) as u_rov_1day, max(u_ros_1day) as u_ros_1day, max(u_ctr_3day) as u_ctr_3day, max(u_str_3day) as u_str_3day, max(u_rov_3day) as u_rov_3day, max(u_ros_3day) as u_ros_3day, max(u_ctr_7day) as u_ctr_7day, max(u_str_7day) as u_str_7day, max(u_rov_7day) as u_rov_7day, max(u_ros_7day) as u_ros_7day, max(u_ctr_3month) as u_ctr_3month, max(u_str_3month) as u_str_3month, max(u_rov_3month) as u_rov_3month, max(u_ros_3month) as u_ros_3month FROM candidate group by u_id ), candidate_item as ( select i_id, max(i_up_id) as i_up_id, max(i_title_len) as i_title_len, max(i_play_len) as i_play_len, max(i_days_since_upload) as i_days_since_upload, max(i_1day_exp_cnt) as i_1day_exp_cnt, max(i_1day_click_cnt) as i_1day_click_cnt, max(i_1day_share_cnt) as i_1day_share_cnt, max(i_1day_return_cnt) as i_1day_return_cnt, max(i_3day_exp_cnt) as i_3day_exp_cnt, max(i_3day_click_cnt) as i_3day_click_cnt, max(i_3day_share_cnt) as i_3day_share_cnt, max(i_3day_return_cnt) as i_3day_return_cnt, max(i_7day_exp_cnt) as i_7day_exp_cnt, max(i_7day_click_cnt) as i_7day_click_cnt, max(i_7day_share_cnt) as i_7day_share_cnt, max(i_7day_return_cnt) as i_7day_return_cnt, max(i_3month_exp_cnt) as i_3month_exp_cnt, max(i_3month_click_cnt) as i_3month_click_cnt, max(i_3month_share_cnt) as i_3month_share_cnt, max(i_3month_return_cnt) as i_3month_return_cnt, max(i_ctr_1day) as i_ctr_1day, max(i_str_1day) as i_str_1day, max(i_rov_1day) as i_rov_1day, max(i_ros_1day) as i_ros_1day, max(i_ctr_3day) as i_ctr_3day, max(i_str_3day) as i_str_3day, max(i_rov_3day) as i_rov_3day, max(i_ros_3day) as i_ros_3day, max(i_ctr_7day) as i_ctr_7day, max(i_str_7day) as i_str_7day, max(i_rov_7day) as i_rov_7day, max(i_ros_7day) as i_ros_7day, max(i_ctr_3month) as i_ctr_3month, max(i_str_3month) as i_str_3month, max(i_rov_3month) as i_rov_3month, max(i_ros_3month) as i_ros_3month FROM candidate group by i_id ) SELECT * from candidate_item """.format(datetime=datetime) print(sql) data = exe_sql(sql) print('sql done') #data.to_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t') #data = pd.read_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t', dtype=str) data.fillna('', inplace=True) lr_model = LrModel('model/ad_out_v2_model_v1.day.json') item_h_dict = {} k_col = 'i_id' for index, row in tqdm(data.iterrows()): k = row['i_id'] item_features = get_item_features(row) item_h = lr_model.predict_h(item_features) item_h_dict[k] = item_h #print(item_features) #print(item_h) dt = datetime data_key = 'test_lr_v1' key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{data_key}:{dt}" print(key_name) redis_helper.add_data_with_zset(key_name=key_name, data=item_h_dict, expire_time=2 * 24 * 3600) with open('test_item.json', 'w') as fout: json.dump(item_h_dict, fout, indent=2, ensure_ascii=False, sort_keys=True)