liqian 1 year ago
parent
commit
78ea82c632
3 changed files with 75 additions and 28 deletions
  1. 27 4
      app.py
  2. 29 18
      recommend.py
  3. 19 6
      video_recall.py

+ 27 - 4
app.py

@@ -24,10 +24,29 @@ from manager_op import get_video_list, search_video
 from ad_recommend import ad_recommend_predict
 # from werkzeug.middleware.profiler import ProfilerMiddleware
 # from geventwebsocket.handler import WebSocketHandler
-
-app = Flask(__name__)
+from apscheduler.schedulers.background import BackgroundScheduler
 log_ = Log()
 config_ = set_config()
+level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+
+
+def update_level_weight():
+    """定时更新流量池层级权重到内存变量level_weight中"""
+    redis_helper = RedisHelper()
+    global level_weight
+    level_weight_initial = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+    if level_weight_initial is not None:
+        level_weight = json.loads(level_weight_initial)
+    # print(level_weight)
+
+
+sched = BackgroundScheduler(daemon=True, timezone='Asia/Shanghai')  # 指定时区
+sched.add_job(func=update_level_weight, trigger="interval", seconds=10*60)  # 间隔10min后启动
+update_level_weight()
+sched.start()
+
+app = Flask(__name__)
+
 
 
 @app.route('/healthcheck')
@@ -42,6 +61,7 @@ def homepage_recommend():
     # in_homepage = start_time * 1000 + random.randint(0, 100)
     # log_.info({'type': 'homepage', 'in_homepage': in_homepage})
     try:
+        global level_weight
         # log_.info({'request_headers': request.headers})
         request_data = json.loads(request.get_data())
         request_id = request_data.get('requestId')
@@ -105,7 +125,8 @@ def homepage_recommend():
                 params=params,
                 ab_info_data=ab_info_data,
                 version_audit_status=version_audit_status,
-                env_dict = env_dict
+                env_dict=env_dict,
+                level_weight=level_weight
             )
 
             result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
@@ -158,6 +179,7 @@ def relevant_recommend():
     # in_relevant = start_time * 1000 + random.randint(0, 100)
     # log_.info({"type": "relevant", "in_relevant": in_relevant})
     try:
+        global level_weight
         request_data = json.loads(request.get_data())
         request_id = request_data.get('requestId')
         # log_.info({
@@ -230,7 +252,8 @@ def relevant_recommend():
             params=params,
             ab_info_data=ab_info_data,
             version_audit_status=version_audit_status,
-            env_dict = env_dict
+            env_dict=env_dict,
+            level_weight=level_weight
         )
 
         result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}

+ 29 - 18
recommend.py

@@ -139,7 +139,7 @@ def positon_duplicate(pos1_vids, pos2_vids, videos):
 def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None):
+                    shield_config=None, level_weight=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -181,7 +181,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
                              client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                             video_id= video_id, level_weight=level_weight)
     # _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # # 小时级实验
     # if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
@@ -411,7 +412,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
 def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None, env_dict = None):
+                    shield_config=None, env_dict=None, level_weight=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -439,7 +440,8 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
                              client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                             video_id=video_id, level_weight=level_weight)
 
     exp_config = pool_recall.get_sort_ab_codel_config()
     # 60054 全量: simrecall+融合排序
@@ -910,7 +912,7 @@ def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
     return rank_result
 
 
-def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
+def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None):
     """
     根据最终的排序结果更新相关redis数据
     :param result: 排序结果
@@ -1010,7 +1012,7 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
             if flow_recall_video:
                 # update_local_distribute_count(flow_recall_video)
                 # update_local_distribute_count_new(flow_recall_video)
-                update_local_distribute_count_new_with_level(flow_recall_video)
+                update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
                 # log_.info('update local distribute count success!')
 
         # 限流视频分发数记录
@@ -1203,7 +1205,7 @@ def update_local_distribute_count_new(videos):
         log_.error(traceback.format_exc())
 
 
-def update_local_distribute_count_new_with_level(videos):
+def update_local_distribute_count_new_with_level(videos, level_weight):
     """
     更新本地分发数
     :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
@@ -1212,8 +1214,12 @@ def update_local_distribute_count_new_with_level(videos):
     """
     try:
         redis_helper = RedisHelper()
-        level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
-        level_list = [level for level in json.loads(level_weight)]
+        # level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+        # level_list = [level for level in json.loads(level_weight)]
+        if level_weight is None:
+            level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+        level_list = [level for level in level_weight]
+
         for item in videos:
             video_id, flow_pool = item['videoId'], item['flowPool']
             key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
@@ -1713,7 +1719,8 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
 
 
 def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
-                             client_info, ab_exp_info, params, ab_info_data, version_audit_status, env_dict):
+                             client_info, ab_exp_info, params, ab_info_data, version_audit_status, env_dict,
+                             level_weight):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -1816,8 +1823,9 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                                      algo_type='', client_info=client_info,
                                      ab_code=ab_code, expire_time=expire_time,
                                      rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                                     old_video_index=old_video_index, video_id= None,
-                                     params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, env_dict=env_dict)
+                                     old_video_index=old_video_index, video_id=None,
+                                     params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                               env_dict=env_dict, level_weight=level_weight)
         recommend_result['fea_info'] = fea_info
     else:
         result = video_recommend(request_id=request_id,
@@ -1827,7 +1835,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                              ab_code=ab_code, expire_time=expire_time,
                              rule_key=rule_key, data_key=data_key,
                              no_op_flag=no_op_flag, old_video_index=old_video_index,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                 level_weight=level_weight)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1855,7 +1864,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     # else:
     #     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
@@ -1873,7 +1882,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
 
 def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info,
-                             page_type, params, ab_info_data, version_audit_status, env_dict):
+                             page_type, params, ab_info_data, version_audit_status, env_dict, level_weight):
     """
     相关推荐逻辑
     :param request_id: request_id
@@ -1937,7 +1946,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                  ab_code=ab_code, expire_time=expire_time,
                                  rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                                  old_video_index=old_video_index, video_id=video_id,
-                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, env_dict = env_dict)
+                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                               env_dict=env_dict, level_weight=level_weight)
         recommend_result['fea_info'] = fea_info
     else:
         result = video_recommend(request_id=request_id,
@@ -1947,7 +1957,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                              ab_code=ab_code, expire_time=expire_time,
                              rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                              old_video_index=old_video_index, video_id=video_id,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
+                                 level_weight=level_weight)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1976,7 +1987,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     # else:
     #      update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),

+ 19 - 6
video_recall.py

@@ -17,7 +17,8 @@ config_ = set_config()
 class PoolRecall(object):
     """召回"""
     def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
-                 rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None, video_id=None):
+                 rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None,
+                 video_id=None, level_weight=None):
         """
         初始化
         :param request_id: request_id
@@ -40,6 +41,7 @@ class PoolRecall(object):
         self.no_op_flag = no_op_flag
         self.rule_key_30day = rule_key_30day
         self.shield_config = shield_config
+        self.level_weight = level_weight
         self.redis_helper = RedisHelper(params=params)
 
     def copy_redis_zset_data(self, from_key_name, to_key_name):
@@ -885,8 +887,14 @@ class PoolRecall(object):
         """
         # flow_pool_key = self.get_pool_redis_key('flow')
         # videos = []
-        level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
-        level_list = [level for level in json.loads(level_weight)]
+        # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+        # level_list = [level for level in json.loads(level_weight)]
+        if self.level_weight is None:
+            level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+        else:
+            level_weight = self.level_weight
+        level_list = [level for level in level_weight]
+
         check_result = []
         for video_id in video_ids:
             video_id = int(video_id)
@@ -1030,11 +1038,16 @@ class PoolRecall(object):
                 return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
             else:
                 # 1. 获取流量池各层级分发概率权重
-                level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
-                if level_weight is None:
+                # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+                # if level_weight is None:
+                #     level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+                # else:
+                #     level_weight = json.loads(level_weight)
+                if self.level_weight is None:
                     level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
                 else:
-                    level_weight = json.loads(level_weight)
+                    level_weight = self.level_weight
+                # print(level_weight)
                 # 2. 判断各层级是否有视频需分发
                 available_level = []
                 for level, weight in level_weight.items():