Browse Source

更新参数

zhangbo 1 year ago
parent
commit
bf8650b584

+ 314 - 0
alg_recsys_recall02_1h_region.py

@@ -0,0 +1,314 @@
+# -*- coding: utf-8 -*-
+import multiprocessing
+import traceback
+import gevent
+import datetime
+import pandas as pd
+import math
+from functools import reduce
+from odps import ODPS
+from threading import Timer
+from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
+    check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
+from config import set_config
+from log import Log
+from check_video_limit_distribute import update_limit_video_score
+
+config_, _ = set_config()
+log_ = Log()
+
+region_code = config_.REGION_CODE
+
+
+RULE_PARAMS = {
+    'rule_params': {
+        'rule66': {
+            'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+            'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
+        },
+        'rule67': {
+          'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+        'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66', 'h_rule_key': 'rule66'
+         },
+         'rule68': {
+             'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+             'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66',
+             'score_func': 'back_rate_exponential_weighting1'
+         },
+
+    },
+    'data_params': config_.DATA_PARAMS,
+    'params_list': [
+        # 532
+        # {'data': 'data66', 'rule': 'rule66'},  # 523-> 523 & 518
+        # {'data': 'data66', 'rule': 'rule67'},  # 523->510
+        # {'data': 'data66', 'rule': 'rule68'},  # 523->514
+        # {'data': 'data66', 'rule': 'rule69'},  # 523->518
+    ],
+}
+
+features = [
+    'apptype',
+    'code',
+    'videoid',
+    'lastonehour_preview',  # 过去1小时预曝光人数 - 区分地域
+    'lastonehour_view',  # 过去1小时曝光人数 - 区分地域
+    'lastonehour_play',  # 过去1小时播放人数 - 区分地域
+    'lastonehour_share',  # 过去1小时分享人数 - 区分地域
+    'lastonehour_return',  # 过去1小时分享,过去1小时回流人数 - 区分地域
+    'lastonehour_preview_total',  # 过去1小时预曝光次数 - 区分地域
+    'lastonehour_view_total',  # 过去1小时曝光次数 - 区分地域
+    'lastonehour_play_total',  # 过去1小时播放次数 - 区分地域
+    'lastonehour_share_total',  # 过去1小时分享次数 - 区分地域
+    'platform_return',
+    'lastonehour_show',  # 不区分地域
+    'lastonehour_show_region',  # 地域分组
+    'lasttwohour_share',  # h-2小时分享人数
+    'lasttwohour_return_now',  # h-2分享,过去1小时回流人数
+    'lasttwohour_return',  # h-2分享,h-2回流人数
+    'lastthreehour_share',  # h-3小时分享人数
+    'lastthreehour_return_now',  # h-3分享,过去1小时回流人数
+    'lastthreehour_return',  # h-3分享,h-3回流人数
+
+    'lastonehour_return_new',  # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_now_new',  # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_new',  # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+]
+
+
+
+def merge_df(df_left, df_right):
+    """
+    df按照videoid, code 合并,对应特征求和
+    :param df_left:
+    :param df_right:
+    :return:
+    """
+    df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
+    df_merged.fillna(0, inplace=True)
+    feature_list = ['videoid', 'code']
+    for feature in features:
+        if feature in ['apptype', 'videoid', 'code']:
+            continue
+        df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
+        feature_list.append(feature)
+    return df_merged[feature_list]
+
+def video_rank(df, now_date, now_h, rule_key, param, region, data_key):
+    """
+    获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
+    :param df:
+    :param now_date:
+    :param now_h:
+    :param rule_key: 小时级数据进入条件
+    :param param: 小时级数据进入条件参数
+    :param region: 所属地域
+    :return:
+    """
+    redis_helper = RedisHelper()
+
+    # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
+    return_count = param.get('return_count', 1)
+    score_value = param.get('score_rule', 0)
+    platform_return_rate = param.get('platform_return_rate', 0)
+    h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
+                     & (df['platform_return_rate'] >= platform_return_rate)]
+
+    # videoid重复时,保留分值高
+    h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
+    h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
+    h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
+
+    log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
+
+    h_recall_videos = h_recall_df['videoid'].to_list()
+    log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_videos)}")
+    # 视频状态过滤
+    filtered_videos = filter_video_status(h_recall_videos)
+
+    # 屏蔽视频过滤
+    shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
+    shield_key_name_list = shield_config.get(region, None)
+    if shield_key_name_list is not None:
+        filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
+
+    # 涉政视频过滤
+    political_filter = param.get('political_filter', None)
+    if political_filter is True:
+        filtered_videos = filter_political_videos(video_ids=filtered_videos)
+    log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
+
+
+    h_video_ids = []
+    # 写入对应的redis
+    h_recall_result = {}
+    for video_id in filtered_videos:
+        score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
+        h_recall_result[int(video_id)] = float(score)
+        h_video_ids.append(int(video_id))
+    h_recall_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
+        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
+    if len(h_recall_result) > 0:
+        log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
+        redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
+        # 限流视频score调整
+        tmp = update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
+        if tmp:
+            log_.info(f"走了限流逻辑后:count = {len(h_recall_result)}, key = {h_recall_key_name}")
+        else:
+            log_.info("走了限流逻辑,但没更改redis,未生效。")
+        # 清空线上过滤应用列表
+        # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
+    else:
+        log_.info(f"无数据,不写入。")
+def cal_score_initial(df, param):
+    """
+    计算score
+    :param df: 特征数据
+    :param param: 规则参数
+    :return:
+    """
+    # score计算公式: sharerate*backrate*logback*ctr
+    # sharerate = lastonehour_share/(lastonehour_play+1000)
+    # backrate = lastonehour_return/(lastonehour_share+10)
+    # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
+    # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
+
+    df = df.fillna(0)
+    df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
+    df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
+    df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
+    if param.get('view_type', None) == 'video-show':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
+    elif param.get('view_type', None) == 'video-show-region':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
+    else:
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
+    df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
+
+    df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
+
+    df['score1'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
+
+    click_score_rate = param.get('click_score_rate', None)
+    back_score_rate = param.get('click_score_rate', None)
+    if click_score_rate is not None:
+        df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
+    elif back_score_rate is not None:
+        df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
+    else:
+        df['score'] = df['score1']
+
+    df = df.sort_values(by=['score'], ascending=False)
+    return df
+def cal_score(df, param):
+    df = cal_score_initial(df=df, param=param)
+    return df
+
+def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
+    log_.info(f"多协程的region = {region} 开始执行")
+    region_df = df_merged[df_merged['code'] == region]
+    log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
+    score_df = cal_score(df=region_df, param=rule_param)
+    video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
+               region=region, data_key=data_key)
+    log_.info(f"多协程的region = {region} 完成执行")
+
+def process_with_param(param, data_params_item, rule_params_item, region_code_list,
+                       feature_df,
+                       now_date, now_h):
+    data_key = param.get('data')
+    data_param = data_params_item.get(data_key)
+    rule_key = param.get('rule')
+    rule_param = rule_params_item.get(rule_key)
+    log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
+    log_.info("具体的规则是:{}.".format(rule_param))
+
+    df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+    df_merged = reduce(merge_df, df_list)
+    task_list = [
+        gevent.spawn(process_with_region,
+                     region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
+        for region in region_code_list
+    ]
+    gevent.joinall(task_list)
+    log_.info(f"多进程的 param = {param} 完成执行!")
+
+def get_feature_data(project, table, time_dt_h):
+    records = get_data_from_odps(date=time_dt_h, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+def rank_by_h(project, table, time_dt_h, time_hour, rule_params, region_code_list):
+    feature_df = get_feature_data(project=project, table=table, time_dt_h=time_dt_h)
+    feature_df['apptype'] = feature_df['apptype'].astype(int)
+    data_params_item = rule_params.get('data_params')
+    rule_params_item = rule_params.get('rule_params')
+    params_list = rule_params.get('params_list')
+    pool = multiprocessing.Pool(processes=len(params_list))
+    for param in params_list:
+        pool.apply_async(
+            func=process_with_param,
+            args=(param, data_params_item, rule_params_item, region_code_list, feature_df, time_dt_h, time_hour)
+        )
+    pool.close()
+    pool.join()
+
+def h_timer_check():
+    try:
+        # 1 配置参数读取
+        rule_params = RULE_PARAMS
+        project = config_.PROJECT_REGION_APP_TYPE
+        table = config_.TABLE_REGION_APP_TYPE
+        region_code_list = [code for region, code in region_code.items()]
+
+        # 2 开始执行-时间统计
+        time_now = datetime.datetime.today()
+        time_dt = datetime.datetime.strftime(time_now, '%Y%m%d')
+        time_dt_h = datetime.datetime.strftime(time_now, '%Y%m%d%H')
+        time_hour = datetime.datetime.now().hour
+        time_minute = datetime.datetime.now().minute
+        log_.info(f"开始执行: {time_dt_h}")
+
+        # 查看当前小时更新的数据是否已准备好
+        h_data_count = h_data_check(project=project, table=table, now_date=time_now)
+        if h_data_count > 0:
+            log_.info('上游数据表查询数据条数 h_data_count = {}, 开始进行更新。'.format(h_data_count))
+            # 数据准备好,进行更新
+            rank_by_h(time_dt_h=time_dt_h, time_hour=time_hour, rule_params=rule_params,
+                      project=project, table=table, region_code_list=region_code_list)
+            log_.info("数据1----------正常完成----------")
+        elif time_minute > 40:
+            log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
+                          rule_rank_h_flag=rule_rank_h_flag)
+            log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
+        else:
+            # 数据没准备好,1分钟后重新检查
+            log_.info("上游数据未就绪,等待...")
+            Timer(60, h_timer_check).start()
+    except Exception as e:
+        log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组小时级数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    log_.info("文件01_1h_region.py:「1小时地域」 开始执行")
+    h_timer_check()

+ 1 - 8
alg_recsys_recall_1h_region.py

@@ -1,6 +1,5 @@
 # -*- coding: utf-8 -*-
 import multiprocessing
-import sys
 import traceback
 import gevent
 import datetime
@@ -8,15 +7,13 @@ import pandas as pd
 import math
 from functools import reduce
 from odps import ODPS
-from threading import Timer, Thread
+from threading import Timer
 from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
     check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
 from config import set_config
 from log import Log
 from check_video_limit_distribute import update_limit_video_score
 
-# os.environ['NUMEXPR_MAX_THREADS'] = '16'
-
 config_, _ = set_config()
 log_ = Log()
 
@@ -1107,10 +1104,6 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_fl
 
 def h_timer_check():
     try:
-        # rule_rank_h_flag = sys.argv[1]
-        # if rule_rank_h_flag == '48h':
-        #     rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
-        # else:
         rule_rank_h_flag = "24h"
         rule_params = RULE_PARAMS
         project = config_.PROJECT_REGION_APP_TYPE

+ 2 - 0
alg_recsys_recall_hour_region_task.sh

@@ -31,3 +31,5 @@ elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
   echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
   echo "all done"
 fi
+
+#10 * * * * cd /zhangbo/rov-offline && /bin/sh alg_recsys_recall_hour_region_task.sh > my_logs/task_$(date +\%Y-\%m-\%d_\%H).log 2>&1