Ver código fonte

update flowpool recall: add abtest

liqian 1 ano atrás
pai
commit
e1ec50ae71
3 arquivos alterados com 116 adições e 81 exclusões
  1. 19 6
      app.py
  2. 4 0
      config.py
  3. 93 75
      recommend.py

+ 19 - 6
app.py

@@ -29,10 +29,17 @@ from apscheduler.schedulers.background import BackgroundScheduler
 log_ = Log()
 config_ = set_config()
 level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+flow_pool_abtest_config = {'control_group': [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
+                           'experimental_flow_set_level': []}
 
 
-def update_level_weight():
-    """定时更新流量池层级权重到内存变量level_weight中"""
+def update_flow_pool_config():
+    """
+    定时更新流量池相关预设配置到内存变量中
+    1. level_weight: 流量池层级权重
+    2. flow_pool_abtest_config: 流量池ab实验配置
+    :return: None
+    """
     redis_helper = RedisHelper()
     global level_weight
     level_weight_initial = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
@@ -43,11 +50,15 @@ def update_level_weight():
     #     "level_weight": level_weight
     # })
     # print(level_weight)
+    global flow_pool_abtest_config
+    flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
+    if flow_pool_abtest_config is not None:
+        flow_pool_abtest_config = json.loads(flow_pool_abtest_config)
 
 
 sched = BackgroundScheduler(daemon=True, timezone='Asia/Shanghai')  # 指定时区
-sched.add_job(func=update_level_weight, trigger="interval", seconds=10*60)  # 间隔10min后启动
-update_level_weight()
+sched.add_job(func=update_flow_pool_config, trigger="interval", seconds=10*60)  # 间隔10min后启动
+update_flow_pool_config()
 sched.start()
 
 app = Flask(__name__)
@@ -131,7 +142,8 @@ def homepage_recommend():
                 ab_info_data=ab_info_data,
                 version_audit_status=version_audit_status,
                 env_dict=env_dict,
-                level_weight=level_weight
+                level_weight=level_weight,
+                flow_pool_abtest_config=flow_pool_abtest_config
             )
 
             result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
@@ -258,7 +270,8 @@ def relevant_recommend():
             ab_info_data=ab_info_data,
             version_audit_status=version_audit_status,
             env_dict=env_dict,
-            level_weight=level_weight
+            level_weight=level_weight,
+            flow_pool_abtest_config=flow_pool_abtest_config
         )
 
         result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}

+ 4 - 0
config.py

@@ -703,6 +703,10 @@ class BaseConfig(object):
     # appType = 6, ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.apptype.h.item.score.6.{h}
     # RECALL_KEY_NAME_PREFIX_APP_TYPE = 'com.weiqu.video.recall.hot.apptype.h.item.score.6.'
 
+    # 流量池ID列表: 尾号0-9对应流量池ID:7-16
+    FLOWPOOL_ID_LIST = [7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
+    # 流量池分发实验配置(对照组与实验组划分)存放 redis key
+    FLOWPOOL_ABTEST_KEY_NAME = 'flow:pool:abtest:config'
     # 流量池离线模型结果存放 redis key前缀,完整格式 flow:pool:item:score:{appType}
     FLOWPOOL_KEY_NAME_PREFIX = 'flow:pool:item:score:'
     # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:score:{appType}:{flowPool_id}

+ 93 - 75
recommend.py

@@ -1,4 +1,5 @@
 import json
+import random
 import time
 import multiprocessing
 import traceback
@@ -139,7 +140,7 @@ def positon_duplicate(pos1_vids, pos2_vids, videos):
 def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None, level_weight=None):
+                    shield_config=None, level_weight=None, flow_pool_abtest_group=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -217,32 +218,36 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #print("exp_config:", exp_config)
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
-        if ab_code==60058:
-            t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
-            t.append(gevent.spawn(pool_recall.get_play_reall, mid))
-        elif  ab_code==60059:
-            t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
-        elif  ab_code==60061 or ab_code==60063:
-            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
-        elif  ab_code==60062:
-            t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
-        elif  ab_code==60064:
-            t.append(gevent.spawn(pool_recall.get_return_video_reall))
-    else:
+        # if ab_code == 60058:
+        #     t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
+        #     t.append(gevent.spawn(pool_recall.get_play_reall, mid))
+        # elif ab_code == 60059:
+        #     t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
+        # elif ab_code == 60061 or ab_code == 60063:
+        #     t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
+        # elif ab_code == 60062:
+        #     t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
+        # elif ab_code == 60064:
+        #     t.append(gevent.spawn(pool_recall.get_return_video_reall))
+    elif flow_pool_abtest_group == 'experimental_flow_set_level':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size)]
-        if ab_code==60058:
-            t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
-            t.append(gevent.spawn(pool_recall.get_play_reall, mid))
-        elif ab_code == 60059:
-            t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
-        elif ab_code == 60061 or ab_code==60063:
-            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
-        elif ab_code == 60062:
-            t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
-        elif  ab_code==60064:                                                                         
-            t.append(gevent.spawn(pool_recall.get_return_video_reall))
+    else:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+    if ab_code == 60058:
+        t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
+        t.append(gevent.spawn(pool_recall.get_play_reall, mid))
+    elif ab_code == 60059:
+        t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
+    elif ab_code == 60061 or ab_code == 60063:
+        t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
+    elif ab_code == 60062:
+        t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
+    elif ab_code == 60064:
+        t.append(gevent.spawn(pool_recall.get_return_video_reall))
     # 最惊奇相关推荐实验
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
     #     t = [gevent.spawn(pool_recall.relevant_recall_19, video_id, size, expire_time),
@@ -412,7 +417,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
 def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None, env_dict=None, level_weight=None):
+                    shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -458,10 +463,14 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         # if ab_code == 60068 or ab_code == 60070:
         #     t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
         #     t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
-    else:
+    elif flow_pool_abtest_group == 'experimental_flow_set_level':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size)]
+    else:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
     if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
     elif ab_code == 60056 or ab_code == 60071:
@@ -912,7 +921,7 @@ def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
     return rank_result
 
 
-def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None):
+def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None, flow_pool_abtest_group=None):
     """
     根据最终的排序结果更新相关redis数据
     :param result: 排序结果
@@ -1010,9 +1019,11 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_w
             else:
                 flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
             if flow_recall_video:
-                # update_local_distribute_count(flow_recall_video)
+                if flow_pool_abtest_group == 'experimental_flow_set_level':
+                    update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
+                else:
+                    update_local_distribute_count(flow_recall_video)
                 # update_local_distribute_count_new(flow_recall_video)
-                update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
                 # log_.info('update local distribute count success!')
 
         # 限流视频分发数记录
@@ -1285,7 +1296,8 @@ def get_religion_class_with_mid(mid, religion_class_name):
     return religion_class_flag
 
 
-def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_type, page_type=0):
+def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_type, page_type=0,
+                         flow_pool_abtest_config=None):
     """
     根据实验分组给定对应的推荐参数
     :param recommend_type: 首页推荐和相关推荐区分参数(0-首页推荐,1-相关推荐)
@@ -1294,6 +1306,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
     :param mid: mid
     :param app_type: app_type, type-int
     :param page_type: 页面区分参数,默认:0(首页)
+    :param flow_pool_abtest_config: 流量池abtest配置
     :return:
     """
     top_K = config_.K
@@ -1715,12 +1728,22 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
     #             data_key = 'data1'
     #             no_op_flag = True
 
-    return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, shield_config
+    # 流量池分发实验组划分
+    # 1. 随机选取流量池id
+    flow_pool_id_choice = random.choice(config_.FLOWPOOL_ID_LIST)
+    # 2. 判断流量id所属实验配置分组
+    flow_pool_abtest_group = 'control_group'
+    for key, items in flow_pool_abtest_config.items():
+        if int(flow_pool_id_choice) in items:
+            flow_pool_abtest_group = key
+
+    return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, \
+           shield_config, flow_pool_abtest_group
 
 
 def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                              client_info, ab_exp_info, params, ab_info_data, version_audit_status, env_dict,
-                             level_weight):
+                             level_weight, flow_pool_abtest_config):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -1784,9 +1807,9 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config = \
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \
         get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid,
-                             app_type=app_type)
+                             app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1817,26 +1840,22 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
             or ab_code == 60074:
-        result, fea_info = video_old_recommend(request_id=request_id,
-                                     mid=mid, uid=uid, app_type=app_type,
-                                     size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                                     algo_type='', client_info=client_info,
-                                     ab_code=ab_code, expire_time=expire_time,
-                                     rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                                     old_video_index=old_video_index, video_id=None,
-                                     params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
-                                               env_dict=env_dict, level_weight=level_weight)
+        result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
+                                               top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
+                                               client_info=client_info, ab_code=ab_code, expire_time=expire_time,
+                                               rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                               old_video_index=old_video_index, video_id=None, params=params,
+                                               rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                               env_dict=env_dict, level_weight=level_weight,
+                                               flow_pool_abtest_group=flow_pool_abtest_group)
         recommend_result['fea_info'] = fea_info
     else:
-        result = video_recommend(request_id=request_id,
-                             mid=mid, uid=uid, app_type=app_type,
-                             size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                             algo_type=algo_type, client_info=client_info,
-                             ab_code=ab_code, expire_time=expire_time,
-                             rule_key=rule_key, data_key=data_key,
-                             no_op_flag=no_op_flag, old_video_index=old_video_index,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
-                                 level_weight=level_weight)
+        result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,
+                                 flow_pool_P=flow_pool_P, algo_type=algo_type, client_info=client_info,
+                                 ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key,
+                                 no_op_flag=no_op_flag, old_video_index=old_video_index, params=params,
+                                 rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
+                                 flow_pool_abtest_group=flow_pool_abtest_group)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1864,7 +1883,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     # else:
     #     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight)
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
+                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
@@ -1882,7 +1902,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
 
 def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info,
-                             page_type, params, ab_info_data, version_audit_status, env_dict, level_weight):
+                             page_type, params, ab_info_data, version_audit_status, env_dict,
+                             level_weight, flow_pool_abtest_config):
     """
     相关推荐逻辑
     :param request_id: request_id
@@ -1911,9 +1932,9 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config = \
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \
         get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type,
-                             mid=mid, app_type=app_type)
+                             mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1939,26 +1960,22 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
             or ab_code == 60074:
-        result, fea_info = video_old_recommend(request_id=request_id,
-                                 mid=mid, uid=uid, app_type=app_type,
-                                 size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                                 algo_type='', client_info=client_info,
-                                 ab_code=ab_code, expire_time=expire_time,
-                                 rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                                 old_video_index=old_video_index, video_id=video_id,
-                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
-                                               env_dict=env_dict, level_weight=level_weight)
+        result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
+                                               top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
+                                               client_info=client_info, ab_code=ab_code, expire_time=expire_time,
+                                               rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                               old_video_index=old_video_index, video_id=video_id, params=params,
+                                               rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                               env_dict=env_dict, level_weight=level_weight,
+                                               flow_pool_abtest_group=flow_pool_abtest_group)
         recommend_result['fea_info'] = fea_info
     else:
-        result = video_recommend(request_id=request_id,
-                             mid=mid, uid=uid, app_type=app_type,
-                             size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                             algo_type='', client_info=client_info,
-                             ab_code=ab_code, expire_time=expire_time,
-                             rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                             old_video_index=old_video_index, video_id=video_id,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
-                                 level_weight=level_weight)
+        result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,
+                                 flow_pool_P=flow_pool_P, algo_type='', client_info=client_info, ab_code=ab_code,
+                                 expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                 old_video_index=old_video_index, video_id=video_id, params=params,
+                                 rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
+                                 flow_pool_abtest_group=flow_pool_abtest_group)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1987,7 +2004,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     # else:
     #      update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight)
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
+                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),