瀏覽代碼

add user_v3

yangxiaohui 1 年之前
父節點
當前提交
3faf1756e1

+ 174 - 0
ad_out_v1_get_offline_score_item_v2_debug.py

@@ -0,0 +1,174 @@
+#coding utf-8
+import sys
+import json
+import datetime
+import traceback
+from threading import Timer
+from tqdm import tqdm
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+from feature import get_item_features
+from lr_model import LrModel
+from utils import exe_sql
+
+def update_offline_score_item(dt):
+    project = 'loghubods'
+    sql = """
+--odps sql
+--********************************************************************--
+--author:研发
+--create time:2023-12-11 23:54:20
+--********************************************************************--
+with candidate_item as (
+select
+-- 基础特征_视频
+videoid AS i_id
+,uid AS i_up_id
+-- ,tags as i_tag
+-- ,title as i_title
+,ceil(log2(length(title) + 1)) as i_title_len
+,ceil(log2(total_time + 1)) as i_play_len
+,ceil(log2(existence_days + 1)) as i_days_since_upload -- 发布时间(距离现在天数)
+-- 基础特征_场景
+-- ,apptype AS ctx_apptype
+-- ,ctx_day AS ctx_day
+-- ,ctx_week AS ctx_week
+-- ,ctx_hour AS ctx_hour
+-- ,ctx_region as ctx_region
+-- ,ctx_city as ctx_city
+-- 基础特征_交叉
+-- ,ui_is_out as ui_is_out
+-- ,i_play_len as playtime
+-- ,IF(i_play_len > 1,'0','1') AS ui_is_out_new
+-- ,rootmid AS ui_root_id
+-- ,shareid AS ui_share_id
+-- 统计特征_视频
+,ceil(log2(i_1day_exp_cnt + 1)) as i_1day_exp_cnt
+,ceil(log2(i_1day_click_cnt + 1)) as i_1day_click_cnt
+,ceil(log2(i_1day_share_cnt + 1)) as i_1day_share_cnt
+,ceil(log2(i_1day_return_cnt + 1)) as i_1day_return_cnt
+,ceil(log2(i_3day_exp_cnt + 1)) as i_3day_exp_cnt
+,ceil(log2(i_3day_click_cnt + 1)) as i_3day_click_cnt
+,ceil(log2(i_3day_share_cnt + 1)) as i_3day_share_cnt
+,ceil(log2(i_3day_return_cnt + 1)) as i_3day_return_cnt
+,ceil(log2(i_7day_exp_cnt + 1)) as i_7day_exp_cnt
+,ceil(log2(i_7day_click_cnt + 1)) as i_7day_click_cnt
+,ceil(log2(i_7day_share_cnt + 1)) as i_7day_share_cnt
+,ceil(log2(i_7day_return_cnt + 1)) as i_7day_return_cnt
+,ceil(log2(i_3month_exp_cnt + 1)) as i_3month_exp_cnt
+,ceil(log2(i_3month_click_cnt + 1)) as i_3month_click_cnt
+,ceil(log2(i_3month_share_cnt + 1)) as i_3month_share_cnt
+,ceil(log2(i_3month_return_cnt + 1)) as i_3month_return_cnt
+,round(if(i_ctr_1day > 10.0, 10.0, i_ctr_1day) / 10.0, 6) as i_ctr_1day
+,round(if(i_str_1day > 10.0, 10.0, i_str_1day) / 10.0, 6) as i_str_1day
+,round(if(i_rov_1day > 10.0, 10.0, i_rov_1day) / 10.0, 6) as i_rov_1day
+,round(if(i_ros_1day > 10.0, 10.0, i_ros_1day) / 10.0, 6) as i_ros_1day
+,round(if(i_ctr_3day > 10.0, 10.0, i_ctr_3day) / 10.0, 6) as i_ctr_3day
+,round(if(i_str_3day > 10.0, 10.0, i_str_3day) / 10.0, 6) as i_str_3day
+,round(if(i_rov_3day > 10.0, 10.0, i_rov_3day) / 10.0, 6) as i_rov_3day
+,round(if(i_ros_3day > 10.0, 10.0, i_ros_3day) / 10.0, 6) as i_ros_3day
+,round(if(i_ctr_7day > 10.0, 10.0, i_ctr_7day) / 10.0, 6) as i_ctr_7day
+,round(if(i_str_7day > 10.0, 10.0, i_str_7day) / 10.0, 6) as i_str_7day
+,round(if(i_rov_7day > 10.0, 10.0, i_rov_7day) / 10.0, 6) as i_rov_7day
+,round(if(i_ros_7day > 10.0, 10.0, i_ros_7day) / 10.0, 6) as i_ros_7day
+,round(if(i_ctr_3month > 10.0, 10.0, i_ctr_3month) / 10.0, 6) as i_ctr_3month
+,round(if(i_str_3month > 10.0, 10.0, i_str_3month) / 10.0, 6) as i_str_3month
+,round(if(i_rov_3month > 10.0, 10.0, i_rov_3month) / 10.0, 6) as i_rov_3month
+,round(if(i_ros_3month > 10.0, 10.0, i_ros_3month) / 10.0, 6) as i_ros_3month
+from
+loghubods.alg_recsys_video_info
+where dt='{dt}'
+and length(videoid) > 0
+)
+SELECT
+i_id as k,
+*
+from candidate_item
+    """.format(dt=dt)
+    # log_.info(sql)
+    data = exe_sql(project, sql)
+    print('sql done')
+    # data.to_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t')
+    # data = pd.read_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t', dtype=str)
+    model_key = 'ad_out_v1'
+    lr_model = LrModel('model/{}.json'.format(model_key))
+    item_h_dict = {}
+    key_name_prefix = f"{config_.KEY_NAME_PREFIX_AD_OUT_MODEL_SCORE_ITEM}{model_key}"
+    print(key_name_prefix)
+    mean_item_h = 0.0
+    count_item_h = 0
+    # 过期时间:一周
+    expire_time = 7 * 24 * 3600
+    with data.open_reader() as reader:
+        for row in tqdm(reader):
+            k = str(row['i_id'])
+            item_features = get_item_features(row)
+            item_h = lr_model.predict_h(item_features)
+            #redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", item_h, expire_time)
+            item_h_dict[k] = item_h
+            mean_item_h += item_h
+            count_item_h += 1
+            # print(item_features)
+            # print(item_h)
+    mean_item_h = mean_item_h / count_item_h
+    item_h_dict['mean'] = mean_item_h
+    print(mean_item_h)
+    print(count_item_h)
+    k = 'mean'
+    #redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", mean_item_h, expire_time)
+    with open('{}.{}.v2.json'.format(key_name_prefix, dt), 'w') as fout:
+         json.dump(item_h_dict, fout, indent=2, ensure_ascii=False, sort_keys=True)
+
+
+def timer_check(dt):
+    try:
+        project = config_.ad_model_data['ad_out_v1_item'].get('project')
+        table = config_.ad_model_data['ad_out_v1_item'].get('table')
+        now_date = datetime.datetime.today()
+        yesterday_date = now_date - datetime.timedelta(days=1)
+        now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        yesterday_dt = datetime.datetime.strftime(yesterday_date, '%Y%m%d')
+        log_.info(f"now_dt: {now_dt}")
+        if dt is not None:
+            yesterday_dt = dt
+        log_.info(f"update_dt: {yesterday_dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=yesterday_dt)
+        if data_count > 0:
+            log_.info('update_offline_score_item start! {}'.format(data_count))
+            # 数据准备好,进行更新
+            update_offline_score_item(dt=yesterday_dt)
+            log_.info('update_offline_score_item end!')
+        else:
+            # 数据没准备好,5分钟后重新检查
+            wait_seconds = 5 * 60
+            log_.info('data not ready, wait {}s'.format(wait_seconds))
+            Timer(wait_seconds, timer_check).start()
+
+    except Exception as e:
+        log_.error(f"用户广告跳出率预估离线item数据更新失败 exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户广告跳出率预估离线item数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == "__main__":
+    dt = None
+    if len(sys.argv) > 1:
+        dt = sys.argv[1]
+        log_.info('## 手动更新:{}'.format(dt))
+    else:
+        log_.info('## 自动更新')
+    timer_check(dt)
+
+

+ 10 - 151
ad_out_v1_get_offline_score_user.py

@@ -19,41 +19,21 @@ from utils import exe_sql
 def update_offline_score_user(dt):
     project = 'loghubods'
     sql = """
---odps sql 
+--odps sql
 --********************************************************************--
 --author:研发
---create time:2023-12-01 15:48:17
+--create time:2023-12-11 23:54:20
 --********************************************************************--
-with candidate as (
+with candidate_user as (
 select
 -- 基础特征_用户
-mid AS u_id
+mids AS u_id
 ,machineinfo_brand AS u_brand
 ,machineinfo_model AS u_device
 ,SPLIT(machineinfo_system,' ')[0] AS u_system
 ,machineinfo_system AS u_system_ver
--- 基础特征_视频
-,videoid AS i_id
-,i_up_id AS i_up_id
-,tags as i_tag
-,title as i_title
-,ceil(log2(i_title_len + 1)) as i_title_len
-,ceil(log2(total_time + 1)) as i_play_len
-,ceil(log2(i_days_since_upload + 1)) as i_days_since_upload -- 发布时间(距离现在天数)
--- 基础特征_场景
-,apptype AS ctx_apptype
-,ctx_day AS ctx_day
-,ctx_week AS ctx_week
-,ctx_hour AS ctx_hour
-,ctx_region as ctx_region
-,ctx_city as ctx_city
--- 基础特征_交叉
-,ui_is_out as ui_is_out
-,i_play_len as playtime
--- ,IF(i_play_len > 1,'0','1') AS ui_is_out_new
-,rootmid AS ui_root_id
-,shareid AS ui_share_id
--- 统计特征_用户
+,province as ctx_region
+,city as ctx_city
 ,u_cycle_bucket_7days
 ,u_cycle_bucket_30days
 ,u_share_bucket_30days
@@ -89,135 +69,14 @@ mid AS u_id
 ,round(if(u_str_3month > 10.0, 10.0, u_str_3month) / 10.0, 6) as u_str_3month
 ,round(if(u_rov_3month > 10.0, 10.0, u_rov_3month) / 10.0, 6) as u_rov_3month
 ,round(if(u_ros_3month > 10.0, 10.0, u_ros_3month) / 10.0, 6) as u_ros_3month
--- 统计特征_视频
-,ceil(log2(i_1day_exp_cnt + 1)) as i_1day_exp_cnt
-,ceil(log2(i_1day_click_cnt + 1)) as i_1day_click_cnt
-,ceil(log2(i_1day_share_cnt + 1)) as i_1day_share_cnt
-,ceil(log2(i_1day_return_cnt + 1)) as i_1day_return_cnt
-,ceil(log2(i_3day_exp_cnt + 1)) as i_3day_exp_cnt
-,ceil(log2(i_3day_click_cnt + 1)) as i_3day_click_cnt
-,ceil(log2(i_3day_share_cnt + 1)) as i_3day_share_cnt
-,ceil(log2(i_3day_return_cnt + 1)) as i_3day_return_cnt
-,ceil(log2(i_7day_exp_cnt + 1)) as i_7day_exp_cnt
-,ceil(log2(i_7day_click_cnt + 1)) as i_7day_click_cnt
-,ceil(log2(i_7day_share_cnt + 1)) as i_7day_share_cnt
-,ceil(log2(i_7day_return_cnt + 1)) as i_7day_return_cnt
-,ceil(log2(i_3month_exp_cnt + 1)) as i_3month_exp_cnt
-,ceil(log2(i_3month_click_cnt + 1)) as i_3month_click_cnt
-,ceil(log2(i_3month_share_cnt + 1)) as i_3month_share_cnt
-,ceil(log2(i_3month_return_cnt + 1)) as i_3month_return_cnt
-,round(if(i_ctr_1day > 10.0, 10.0, i_ctr_1day) / 10.0, 6) as i_ctr_1day
-,round(if(i_str_1day > 10.0, 10.0, i_str_1day) / 10.0, 6) as i_str_1day
-,round(if(i_rov_1day > 10.0, 10.0, i_rov_1day) / 10.0, 6) as i_rov_1day
-,round(if(i_ros_1day > 10.0, 10.0, i_ros_1day) / 10.0, 6) as i_ros_1day
-,round(if(i_ctr_3day > 10.0, 10.0, i_ctr_3day) / 10.0, 6) as i_ctr_3day
-,round(if(i_str_3day > 10.0, 10.0, i_str_3day) / 10.0, 6) as i_str_3day
-,round(if(i_rov_3day > 10.0, 10.0, i_rov_3day) / 10.0, 6) as i_rov_3day
-,round(if(i_ros_3day > 10.0, 10.0, i_ros_3day) / 10.0, 6) as i_ros_3day
-,round(if(i_ctr_7day > 10.0, 10.0, i_ctr_7day) / 10.0, 6) as i_ctr_7day
-,round(if(i_str_7day > 10.0, 10.0, i_str_7day) / 10.0, 6) as i_str_7day
-,round(if(i_rov_7day > 10.0, 10.0, i_rov_7day) / 10.0, 6) as i_rov_7day
-,round(if(i_ros_7day > 10.0, 10.0, i_ros_7day) / 10.0, 6) as i_ros_7day
-,round(if(i_ctr_3month > 10.0, 10.0, i_ctr_3month) / 10.0, 6) as i_ctr_3month
-,round(if(i_str_3month > 10.0, 10.0, i_str_3month) / 10.0, 6) as i_str_3month
-,round(if(i_rov_3month > 10.0, 10.0, i_rov_3month) / 10.0, 6) as i_rov_3month
-,round(if(i_ros_3month > 10.0, 10.0, i_ros_3month) / 10.0, 6) as i_ros_3month
 from
-loghubods.user_video_features_data_final
+loghubods.alg_recsys_user_info
 where dt='{dt}'
-and ad_ornot = '0'
-and apptype != '13'
-), candidate_user as (
-    SELECT 
-    u_id,
-    max(u_brand) as u_brand,
-    max(u_device) as u_device,
-    max(u_system) as u_system,
-    max(u_system_ver) as u_system_ver,
-    max(ctx_region) as ctx_region,
-    max(ctx_city) as ctx_city,
-    max(u_cycle_bucket_7days) as u_cycle_bucket_7days,
-    max(u_cycle_bucket_30days) as u_cycle_bucket_30days,
-    max(u_share_bucket_30days) as u_share_bucket_30days,
-    max(u_1day_exp_cnt) as u_1day_exp_cnt,
-    max(u_1day_click_cnt) as u_1day_click_cnt,
-    max(u_1day_share_cnt) as u_1day_share_cnt,
-    max(u_1day_return_cnt) as u_1day_return_cnt,
-    max(u_3day_exp_cnt) as u_3day_exp_cnt,
-    max(u_3day_click_cnt) as u_3day_click_cnt,
-    max(u_3day_share_cnt) as u_3day_share_cnt,
-    max(u_3day_return_cnt) as u_3day_return_cnt,
-    max(u_7day_exp_cnt) as u_7day_exp_cnt,
-    max(u_7day_click_cnt) as u_7day_click_cnt,
-    max(u_7day_share_cnt) as u_7day_share_cnt,
-    max(u_7day_return_cnt) as u_7day_return_cnt,
-    max(u_3month_exp_cnt) as u_3month_exp_cnt,
-    max(u_3month_click_cnt) as u_3month_click_cnt,
-    max(u_3month_share_cnt) as u_3month_share_cnt,
-    max(u_3month_return_cnt) as u_3month_return_cnt,
-    max(u_ctr_1day) as u_ctr_1day,
-    max(u_str_1day) as u_str_1day,
-    max(u_rov_1day) as u_rov_1day,
-    max(u_ros_1day) as u_ros_1day,
-    max(u_ctr_3day) as u_ctr_3day,
-    max(u_str_3day) as u_str_3day,
-    max(u_rov_3day) as u_rov_3day,
-    max(u_ros_3day) as u_ros_3day,
-    max(u_ctr_7day) as u_ctr_7day,
-    max(u_str_7day) as u_str_7day,
-    max(u_rov_7day) as u_rov_7day,
-    max(u_ros_7day) as u_ros_7day,
-    max(u_ctr_3month) as u_ctr_3month,
-    max(u_str_3month) as u_str_3month,
-    max(u_rov_3month) as u_rov_3month,
-    max(u_ros_3month) as u_ros_3month
-    FROM 
-    candidate
-    group by u_id
-), candidate_item as (
-    select
-    i_id,
-    max(i_up_id) as i_up_id,
-    max(i_title_len) as i_title_len,
-    max(i_play_len) as i_play_len,
-    max(i_days_since_upload) as i_days_since_upload,
-    max(i_1day_exp_cnt) as i_1day_exp_cnt,
-    max(i_1day_click_cnt) as i_1day_click_cnt,
-    max(i_1day_share_cnt) as i_1day_share_cnt,
-    max(i_1day_return_cnt) as i_1day_return_cnt,
-    max(i_3day_exp_cnt) as i_3day_exp_cnt,
-    max(i_3day_click_cnt) as i_3day_click_cnt,
-    max(i_3day_share_cnt) as i_3day_share_cnt,
-    max(i_3day_return_cnt) as i_3day_return_cnt,
-    max(i_7day_exp_cnt) as i_7day_exp_cnt,
-    max(i_7day_click_cnt) as i_7day_click_cnt,
-    max(i_7day_share_cnt) as i_7day_share_cnt,
-    max(i_7day_return_cnt) as i_7day_return_cnt,
-    max(i_3month_exp_cnt) as i_3month_exp_cnt,
-    max(i_3month_click_cnt) as i_3month_click_cnt,
-    max(i_3month_share_cnt) as i_3month_share_cnt,
-    max(i_3month_return_cnt) as i_3month_return_cnt,
-    max(i_ctr_1day) as i_ctr_1day,
-    max(i_str_1day) as i_str_1day,
-    max(i_rov_1day) as i_rov_1day,
-    max(i_ros_1day) as i_ros_1day,
-    max(i_ctr_3day) as i_ctr_3day,
-    max(i_str_3day) as i_str_3day,
-    max(i_rov_3day) as i_rov_3day,
-    max(i_ros_3day) as i_ros_3day,
-    max(i_ctr_7day) as i_ctr_7day,
-    max(i_str_7day) as i_str_7day,
-    max(i_rov_7day) as i_rov_7day,
-    max(i_ros_7day) as i_ros_7day,
-    max(i_ctr_3month) as i_ctr_3month,
-    max(i_str_3month) as i_str_3month,
-    max(i_rov_3month) as i_rov_3month,
-    max(i_ros_3month) as i_ros_3month
-    FROM 
-    candidate
-    group by i_id
+and length(mids) > 0
+and (u_3month_share_cnt > 0 or u_7day_click_cnt > 0 or u_3day_exp_cnt > 0)
 )
 SELECT
+u_id as k,
 *
 from candidate_user
     """.format(dt=dt)

+ 165 - 0
ad_out_v1_get_offline_score_user_v2_debug.py

@@ -0,0 +1,165 @@
+#coding utf-8
+import sys
+import json
+import datetime
+import traceback
+from threading import Timer
+from tqdm import tqdm
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+from feature import get_user_features
+from lr_model import LrModel
+from utils import exe_sql
+
+def update_offline_score_user(dt):
+    project = 'loghubods'
+    sql = """
+--odps sql
+--********************************************************************--
+--author:研发
+--create time:2023-12-11 23:54:20
+--********************************************************************--
+with candidate_user as (
+select
+-- 基础特征_用户
+mids AS u_id
+,machineinfo_brand AS u_brand
+,machineinfo_model AS u_device
+,SPLIT(machineinfo_system,' ')[0] AS u_system
+,machineinfo_system AS u_system_ver
+,province as ctx_region
+,city as ctx_city
+,u_cycle_bucket_7days
+,u_cycle_bucket_30days
+,u_share_bucket_30days
+,ceil(log2(u_1day_exp_cnt + 1)) as u_1day_exp_cnt
+,ceil(log2(u_1day_click_cnt + 1)) as u_1day_click_cnt
+,ceil(log2(u_1day_share_cnt + 1)) as u_1day_share_cnt
+,ceil(log2(u_1day_return_cnt + 1)) as u_1day_return_cnt
+,ceil(log2(u_3day_exp_cnt + 1)) as u_3day_exp_cnt
+,ceil(log2(u_3day_click_cnt + 1)) as u_3day_click_cnt
+,ceil(log2(u_3day_share_cnt + 1)) as u_3day_share_cnt
+,ceil(log2(u_3day_return_cnt + 1)) as u_3day_return_cnt
+,ceil(log2(u_7day_exp_cnt + 1)) as u_7day_exp_cnt
+,ceil(log2(u_7day_click_cnt + 1)) as u_7day_click_cnt
+,ceil(log2(u_7day_share_cnt + 1)) as u_7day_share_cnt
+,ceil(log2(u_7day_return_cnt + 1)) as u_7day_return_cnt
+,ceil(log2(u_3month_exp_cnt + 1)) as u_3month_exp_cnt
+,ceil(log2(u_3month_click_cnt + 1)) as u_3month_click_cnt
+,ceil(log2(u_3month_share_cnt + 1)) as u_3month_share_cnt
+,ceil(log2(u_3month_return_cnt + 1)) as u_3month_return_cnt
+,round(if(u_ctr_1day > 10.0, 10.0, u_ctr_1day) / 10.0, 6) as u_ctr_1day
+,round(if(u_str_1day > 10.0, 10.0, u_str_1day) / 10.0, 6) as u_str_1day
+,round(if(u_rov_1day > 10.0, 10.0, u_rov_1day) / 10.0, 6) as u_rov_1day
+,round(if(u_ros_1day > 10.0, 10.0, u_ros_1day) / 10.0, 6) as u_ros_1day
+,round(if(u_ctr_3day > 10.0, 10.0, u_ctr_3day) / 10.0, 6) as u_ctr_3day
+,round(if(u_str_3day > 10.0, 10.0, u_str_3day) / 10.0, 6) as u_str_3day
+,round(if(u_rov_3day > 10.0, 10.0, u_rov_3day) / 10.0, 6) as u_rov_3day
+,round(if(u_ros_3day > 10.0, 10.0, u_ros_3day) / 10.0, 6) as u_ros_3day
+,round(if(u_ctr_7day > 10.0, 10.0, u_ctr_7day) / 10.0, 6) as u_ctr_7day
+,round(if(u_str_7day > 10.0, 10.0, u_str_7day) / 10.0, 6) as u_str_7day
+,round(if(u_rov_7day > 10.0, 10.0, u_rov_7day) / 10.0, 6) as u_rov_7day
+,round(if(u_ros_7day > 10.0, 10.0, u_ros_7day) / 10.0, 6) as u_ros_7day
+,round(if(u_ctr_3month > 10.0, 10.0, u_ctr_3month) / 10.0, 6) as u_ctr_3month
+,round(if(u_str_3month > 10.0, 10.0, u_str_3month) / 10.0, 6) as u_str_3month
+,round(if(u_rov_3month > 10.0, 10.0, u_rov_3month) / 10.0, 6) as u_rov_3month
+,round(if(u_ros_3month > 10.0, 10.0, u_ros_3month) / 10.0, 6) as u_ros_3month
+from
+loghubods.alg_recsys_user_info
+where dt='{dt}'
+and length(mids) > 0
+and (u_3month_share_cnt > 0 or u_7day_click_cnt > 0 or u_3day_exp_cnt > 0)
+)
+SELECT
+u_id as k,
+*
+from candidate_user
+    """.format(dt=dt)
+    # log_.info(sql)
+    data = exe_sql(project, sql)
+    print('sql done')
+    # data.to_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t')
+    # data = pd.read_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t', dtype=str)
+    model_key = 'ad_out_v1'
+    lr_model = LrModel('model/{}.json'.format(model_key))
+    user_h_dict = {}
+    k_col = 'u_id'
+    key_name_prefix = f"{config_.KEY_NAME_PREFIX_AD_OUT_MODEL_SCORE_USER}{model_key}"
+    print(key_name_prefix)
+    mean_user_h = 0.0
+    count_user_h = 0
+    # 过期时间:一周
+    expire_time = 7 * 24 * 3600
+    with data.open_reader() as reader:
+        for row in tqdm(reader):
+            k = str(row['u_id'])
+            user_features = get_user_features(row)
+            user_h = lr_model.predict_h(user_features)
+            # redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", user_h, expire_time)
+            user_h_dict[k] = user_h
+            mean_user_h += user_h
+            count_user_h += 1
+            # print(user_features)
+            # print(user_h)
+    mean_user_h = mean_user_h / count_user_h 
+    user_h_dict['mean'] = mean_user_h 
+    print(mean_user_h)
+    print(count_user_h)
+    k = 'mean'
+    #redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", mean_user_h, expire_time)
+    with open('{}.{}.v2.json'.format(key_name_prefix, dt), 'w') as fout:
+        json.dump(user_h_dict, fout, indent=2, ensure_ascii=False, sort_keys=True)
+
+
+def timer_check(dt):
+    try:
+        project = config_.ad_model_data['ad_out_v1_user'].get('project')
+        table = config_.ad_model_data['ad_out_v1_user'].get('table')
+        now_date = datetime.datetime.today()
+        yesterday_date = now_date - datetime.timedelta(days=1)
+        now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        yesterday_dt = datetime.datetime.strftime(yesterday_date, '%Y%m%d')
+        log_.info(f"now_dt: {now_dt}")
+        if dt is not None:
+            yesterday_dt = dt
+        log_.info(f"update_dt: {yesterday_dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=yesterday_dt)
+        if data_count > 0:
+            log_.info('update_offline_score_user start! {}'.format(data_count))
+            # 数据准备好,进行更新
+            update_offline_score_user(dt=yesterday_dt)
+            log_.info('update_offline_score_user end!')
+        else:
+            # 数据没准备好,5分钟后重新检查
+            wait_seconds = 5 * 60
+            log_.info('data not ready, wait {}s'.format(wait_seconds))
+            Timer(wait_seconds, timer_check).start()
+
+    except Exception as e:
+        log_.error(f"用户广告跳出率预估离线user数据更新失败 exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户广告跳出率预估离线user数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == "__main__":
+    dt = None
+    if len(sys.argv) > 1:
+        dt = sys.argv[1]
+        log_.info('## 手动更新:{}'.format(dt))
+    else:
+        log_.info('## 自动更新')
+    timer_check(dt)
+
+

+ 149 - 0
ad_out_v1_get_offline_score_user_v3.py

@@ -0,0 +1,149 @@
+#coding utf-8
+import sys
+import datetime
+import traceback
+from threading import Timer
+from tqdm import tqdm
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+from records_process import records_process
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+from feature import get_user_features as get_features
+from lr_model import LrModel
+from utils import exe_sql
+
+model_key = 'ad_out_v1'
+lr_model = LrModel('model/{}.json'.format(model_key))
+item_h_dict = {}
+key_name_prefix = f"{config_.KEY_NAME_PREFIX_AD_OUT_MODEL_SCORE_USER}{model_key}"
+print(key_name_prefix)
+# 过期时间:一周
+expire_time = 7 * 24 * 3600
+
+def process_and_store(row):
+    k = str(row['k'])
+    features = get_features(row)
+    h = lr_model.predict_h(features)
+    redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", round(h, 6), expire_time)
+
+def update_offline_score_user(dt):
+    project = 'loghubods'
+    sql = """
+--odps sql 
+--********************************************************************--
+--author:研发
+--create time:2023-12-11 23:54:20
+--********************************************************************--
+with candidate_user as (
+select
+-- 基础特征_用户
+mids AS u_id
+,machineinfo_brand AS u_brand
+,machineinfo_model AS u_device
+,SPLIT(machineinfo_system,' ')[0] AS u_system
+,machineinfo_system AS u_system_ver
+,province as ctx_region
+,city as ctx_city
+,u_cycle_bucket_7days
+,u_cycle_bucket_30days
+,u_share_bucket_30days
+,ceil(log2(u_1day_exp_cnt + 1)) as u_1day_exp_cnt
+,ceil(log2(u_1day_click_cnt + 1)) as u_1day_click_cnt
+,ceil(log2(u_1day_share_cnt + 1)) as u_1day_share_cnt
+,ceil(log2(u_1day_return_cnt + 1)) as u_1day_return_cnt
+,ceil(log2(u_3day_exp_cnt + 1)) as u_3day_exp_cnt
+,ceil(log2(u_3day_click_cnt + 1)) as u_3day_click_cnt
+,ceil(log2(u_3day_share_cnt + 1)) as u_3day_share_cnt
+,ceil(log2(u_3day_return_cnt + 1)) as u_3day_return_cnt
+,ceil(log2(u_7day_exp_cnt + 1)) as u_7day_exp_cnt
+,ceil(log2(u_7day_click_cnt + 1)) as u_7day_click_cnt
+,ceil(log2(u_7day_share_cnt + 1)) as u_7day_share_cnt
+,ceil(log2(u_7day_return_cnt + 1)) as u_7day_return_cnt
+,ceil(log2(u_3month_exp_cnt + 1)) as u_3month_exp_cnt
+,ceil(log2(u_3month_click_cnt + 1)) as u_3month_click_cnt
+,ceil(log2(u_3month_share_cnt + 1)) as u_3month_share_cnt
+,ceil(log2(u_3month_return_cnt + 1)) as u_3month_return_cnt
+,round(if(u_ctr_1day > 10.0, 10.0, u_ctr_1day) / 10.0, 6) as u_ctr_1day
+,round(if(u_str_1day > 10.0, 10.0, u_str_1day) / 10.0, 6) as u_str_1day
+,round(if(u_rov_1day > 10.0, 10.0, u_rov_1day) / 10.0, 6) as u_rov_1day
+,round(if(u_ros_1day > 10.0, 10.0, u_ros_1day) / 10.0, 6) as u_ros_1day
+,round(if(u_ctr_3day > 10.0, 10.0, u_ctr_3day) / 10.0, 6) as u_ctr_3day
+,round(if(u_str_3day > 10.0, 10.0, u_str_3day) / 10.0, 6) as u_str_3day
+,round(if(u_rov_3day > 10.0, 10.0, u_rov_3day) / 10.0, 6) as u_rov_3day
+,round(if(u_ros_3day > 10.0, 10.0, u_ros_3day) / 10.0, 6) as u_ros_3day
+,round(if(u_ctr_7day > 10.0, 10.0, u_ctr_7day) / 10.0, 6) as u_ctr_7day
+,round(if(u_str_7day > 10.0, 10.0, u_str_7day) / 10.0, 6) as u_str_7day
+,round(if(u_rov_7day > 10.0, 10.0, u_rov_7day) / 10.0, 6) as u_rov_7day
+,round(if(u_ros_7day > 10.0, 10.0, u_ros_7day) / 10.0, 6) as u_ros_7day
+,round(if(u_ctr_3month > 10.0, 10.0, u_ctr_3month) / 10.0, 6) as u_ctr_3month
+,round(if(u_str_3month > 10.0, 10.0, u_str_3month) / 10.0, 6) as u_str_3month
+,round(if(u_rov_3month > 10.0, 10.0, u_rov_3month) / 10.0, 6) as u_rov_3month
+,round(if(u_ros_3month > 10.0, 10.0, u_ros_3month) / 10.0, 6) as u_ros_3month
+from
+loghubods.alg_recsys_user_info
+where dt='{dt}'
+and length(mids) > 0
+and u_1month_exp_cnt > 0
+)
+SELECT
+u_id as k,
+*
+from candidate_user
+    """.format(dt=dt)
+    # log_.info(sql)
+    records = exe_sql(project, sql)
+    log_.info('sql_done')
+    records_process(records, process_and_store, max_size=50, num_workers=10)
+
+def timer_check(dt):
+    try:
+        project = config_.ad_model_data['ad_out_v1_user'].get('project')
+        table = config_.ad_model_data['ad_out_v1_user'].get('table')
+        now_date = datetime.datetime.today()
+        yesterday_date = now_date - datetime.timedelta(days=1)
+        now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        yesterday_dt = datetime.datetime.strftime(yesterday_date, '%Y%m%d')
+        log_.info(f"now_dt: {now_dt}")
+        if dt is not None:
+            yesterday_dt = dt
+        log_.info(f"update_dt: {yesterday_dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=yesterday_dt)
+        if data_count > 0:
+            log_.info('update_offline_score_user start! {}'.format(data_count))
+            # 数据准备好,进行更新
+            update_offline_score_user(dt=yesterday_dt)
+            log_.info('update_offline_score_user end!')
+        else:
+            # 数据没准备好,5分钟后重新检查
+            wait_seconds = 5 * 60
+            log_.info('data not ready, wait {}s'.format(wait_seconds))
+            Timer(wait_seconds, timer_check, args=(dt,)).start()
+
+    except Exception as e:
+        log_.error(f"用户广告跳出率预估离线user数据更新失败 exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户广告跳出率预估离线user数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == "__main__":
+    dt = None
+    if len(sys.argv) > 1:
+        dt = sys.argv[1]
+        log_.info('## 手动更新:{}'.format(dt))
+    else:
+        log_.info('## 自动更新')
+    timer_check(dt)
+
+

+ 10 - 0
ad_out_v1_get_offline_score_user_v3.sh

@@ -0,0 +1,10 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline &&
+    /root/anaconda3/bin/python /data2/rov-offline/ad_out_v1_get_offline_score_user_v3.py $*
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline &&
+    /root/anaconda3/bin/python /data/rov-offline/ad_out_v1_get_offline_score_user_v3.py $*
+fi
+

+ 165 - 0
ad_out_v1_get_offline_score_user_v3_debug.py

@@ -0,0 +1,165 @@
+#coding utf-8
+import sys
+import json
+import datetime
+import traceback
+from threading import Timer
+from tqdm import tqdm
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+from feature import get_user_features
+from lr_model import LrModel
+from utils import exe_sql
+
+def update_offline_score_user(dt):
+    project = 'loghubods'
+    sql = """
+--odps sql
+--********************************************************************--
+--author:研发
+--create time:2023-12-11 23:54:20
+--********************************************************************--
+with candidate_user as (
+select
+-- 基础特征_用户
+mids AS u_id
+,machineinfo_brand AS u_brand
+,machineinfo_model AS u_device
+,SPLIT(machineinfo_system,' ')[0] AS u_system
+,machineinfo_system AS u_system_ver
+,province as ctx_region
+,city as ctx_city
+,u_cycle_bucket_7days
+,u_cycle_bucket_30days
+,u_share_bucket_30days
+,ceil(log2(u_1day_exp_cnt + 1)) as u_1day_exp_cnt
+,ceil(log2(u_1day_click_cnt + 1)) as u_1day_click_cnt
+,ceil(log2(u_1day_share_cnt + 1)) as u_1day_share_cnt
+,ceil(log2(u_1day_return_cnt + 1)) as u_1day_return_cnt
+,ceil(log2(u_3day_exp_cnt + 1)) as u_3day_exp_cnt
+,ceil(log2(u_3day_click_cnt + 1)) as u_3day_click_cnt
+,ceil(log2(u_3day_share_cnt + 1)) as u_3day_share_cnt
+,ceil(log2(u_3day_return_cnt + 1)) as u_3day_return_cnt
+,ceil(log2(u_7day_exp_cnt + 1)) as u_7day_exp_cnt
+,ceil(log2(u_7day_click_cnt + 1)) as u_7day_click_cnt
+,ceil(log2(u_7day_share_cnt + 1)) as u_7day_share_cnt
+,ceil(log2(u_7day_return_cnt + 1)) as u_7day_return_cnt
+,ceil(log2(u_3month_exp_cnt + 1)) as u_3month_exp_cnt
+,ceil(log2(u_3month_click_cnt + 1)) as u_3month_click_cnt
+,ceil(log2(u_3month_share_cnt + 1)) as u_3month_share_cnt
+,ceil(log2(u_3month_return_cnt + 1)) as u_3month_return_cnt
+,round(if(u_ctr_1day > 10.0, 10.0, u_ctr_1day) / 10.0, 6) as u_ctr_1day
+,round(if(u_str_1day > 10.0, 10.0, u_str_1day) / 10.0, 6) as u_str_1day
+,round(if(u_rov_1day > 10.0, 10.0, u_rov_1day) / 10.0, 6) as u_rov_1day
+,round(if(u_ros_1day > 10.0, 10.0, u_ros_1day) / 10.0, 6) as u_ros_1day
+,round(if(u_ctr_3day > 10.0, 10.0, u_ctr_3day) / 10.0, 6) as u_ctr_3day
+,round(if(u_str_3day > 10.0, 10.0, u_str_3day) / 10.0, 6) as u_str_3day
+,round(if(u_rov_3day > 10.0, 10.0, u_rov_3day) / 10.0, 6) as u_rov_3day
+,round(if(u_ros_3day > 10.0, 10.0, u_ros_3day) / 10.0, 6) as u_ros_3day
+,round(if(u_ctr_7day > 10.0, 10.0, u_ctr_7day) / 10.0, 6) as u_ctr_7day
+,round(if(u_str_7day > 10.0, 10.0, u_str_7day) / 10.0, 6) as u_str_7day
+,round(if(u_rov_7day > 10.0, 10.0, u_rov_7day) / 10.0, 6) as u_rov_7day
+,round(if(u_ros_7day > 10.0, 10.0, u_ros_7day) / 10.0, 6) as u_ros_7day
+,round(if(u_ctr_3month > 10.0, 10.0, u_ctr_3month) / 10.0, 6) as u_ctr_3month
+,round(if(u_str_3month > 10.0, 10.0, u_str_3month) / 10.0, 6) as u_str_3month
+,round(if(u_rov_3month > 10.0, 10.0, u_rov_3month) / 10.0, 6) as u_rov_3month
+,round(if(u_ros_3month > 10.0, 10.0, u_ros_3month) / 10.0, 6) as u_ros_3month
+from
+loghubods.alg_recsys_user_info
+where dt='{dt}'
+and length(mids) > 0
+and u_1month_exp_cnt > 0
+)
+SELECT
+u_id as k,
+*
+from candidate_user
+    """.format(dt=dt)
+    # log_.info(sql)
+    data = exe_sql(project, sql)
+    print('sql done')
+    # data.to_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t')
+    # data = pd.read_csv('./data/ad_out_sample_v2_item.{datetime}'.format(datetime=datetime), sep='\t', dtype=str)
+    model_key = 'ad_out_v1'
+    lr_model = LrModel('model/{}.json'.format(model_key))
+    user_h_dict = {}
+    k_col = 'u_id'
+    key_name_prefix = f"{config_.KEY_NAME_PREFIX_AD_OUT_MODEL_SCORE_USER}{model_key}"
+    print(key_name_prefix)
+    mean_user_h = 0.0
+    count_user_h = 0
+    # 过期时间:一周
+    expire_time = 7 * 24 * 3600
+    with data.open_reader() as reader:
+        for row in tqdm(reader):
+            k = str(row['u_id'])
+            user_features = get_user_features(row)
+            user_h = lr_model.predict_h(user_features)
+            # redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", user_h, expire_time)
+            user_h_dict[k] = user_h
+            mean_user_h += user_h
+            count_user_h += 1
+            # print(user_features)
+            # print(user_h)
+    mean_user_h = mean_user_h / count_user_h 
+    user_h_dict['mean'] = mean_user_h 
+    print(mean_user_h)
+    print(count_user_h)
+    k = 'mean'
+    #redis_helper.set_data_to_redis(f"{key_name_prefix}:{k}", mean_user_h, expire_time)
+    with open('{}.{}.v3.json'.format(key_name_prefix, dt), 'w') as fout:
+        json.dump(user_h_dict, fout, indent=2, ensure_ascii=False, sort_keys=True)
+
+
+def timer_check(dt):
+    try:
+        project = config_.ad_model_data['ad_out_v1_user'].get('project')
+        table = config_.ad_model_data['ad_out_v1_user'].get('table')
+        now_date = datetime.datetime.today()
+        yesterday_date = now_date - datetime.timedelta(days=1)
+        now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        yesterday_dt = datetime.datetime.strftime(yesterday_date, '%Y%m%d')
+        log_.info(f"now_dt: {now_dt}")
+        if dt is not None:
+            yesterday_dt = dt
+        log_.info(f"update_dt: {yesterday_dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=yesterday_dt)
+        if data_count > 0:
+            log_.info('update_offline_score_user start! {}'.format(data_count))
+            # 数据准备好,进行更新
+            update_offline_score_user(dt=yesterday_dt)
+            log_.info('update_offline_score_user end!')
+        else:
+            # 数据没准备好,5分钟后重新检查
+            wait_seconds = 5 * 60
+            log_.info('data not ready, wait {}s'.format(wait_seconds))
+            Timer(wait_seconds, timer_check).start()
+
+    except Exception as e:
+        log_.error(f"用户广告跳出率预估离线user数据更新失败 exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户广告跳出率预估离线user数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == "__main__":
+    dt = None
+    if len(sys.argv) > 1:
+        dt = sys.argv[1]
+        log_.info('## 手动更新:{}'.format(dt))
+    else:
+        log_.info('## 自动更新')
+    timer_check(dt)
+
+