Browse Source

add class Params

liqian 2 years ago
parent
commit
19e84e2892
6 changed files with 126 additions and 30 deletions
  1. 7 2
      app.py
  2. 83 6
      db_helper.py
  3. 4 0
      params_helper.py
  4. 15 8
      recommend.py
  5. 6 4
      video_rank.py
  6. 11 10
      video_recall.py

+ 7 - 2
app.py

@@ -17,6 +17,7 @@ from gevent.pywsgi import WSGIServer
 from multiprocessing import cpu_count, Process
 from utils import update_video_w_h_rate
 from user2new import user2new
+from params_helper import Params
 # from werkzeug.middleware.profiler import ProfilerMiddleware
 
 
@@ -48,6 +49,7 @@ def homepage_recommend():
         ab_exp_info = request_data.get('abExpInfo', None)
         log_.info({'requestUri': '/applet/video/homepage/recommend', 'requestData': request_data})
         # log_.info('homepage_recommend request data: {}'.format(request_data))
+        params = Params(request_id=request_id)
         # size默认为10
         if not size:
             size = 10
@@ -56,7 +58,7 @@ def homepage_recommend():
             videos = video_homepage_recommend(request_id=request_id,
                                               mid=mid, uid=uid, size=size, app_type=app_type,
                                               algo_type=algo_type, client_info=client_info,
-                                              ab_exp_info=ab_exp_info)
+                                              ab_exp_info=ab_exp_info, params=params)
             result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
             log_.info({'requestUri': '/applet/video/homepage/recommend',
                        'request_id': request_id,
@@ -110,9 +112,12 @@ def relevant_recommend():
         # log_.info('requestUri = "{}", requestData = "{}"'.format('/applet/video/relevant/recommend', request_data))
         # log_.info('relevant_recommend request data: {}'.format(request_data))
 
+        params = Params(request_id=request_id)
+
         videos = video_relevant_recommend(request_id=request_id,
                                           video_id=video_id, mid=mid, uid=uid, size=page_size, app_type=app_type,
-                                          ab_exp_info=ab_exp_info, client_info=client_info, page_type=page_type)
+                                          ab_exp_info=ab_exp_info, client_info=client_info, page_type=page_type,
+                                          params=params)
 
         result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
         log_.info({'requestUri': '/applet/video/relevant/recommend',

+ 83 - 6
db_helper.py

@@ -14,7 +14,7 @@ conn_redis = None
 
 
 class RedisHelper(object):
-    def __init__(self):
+    def __init__(self, params=None):
         """
         初始化redis连接信息
         redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
@@ -23,6 +23,7 @@ class RedisHelper(object):
         self.host = redis_info['host']
         self.port = redis_info['port']
         self.password = redis_info['password']
+        self.params = params
 
     def connect(self):
         """
@@ -45,8 +46,14 @@ class RedisHelper(object):
         :param key_name: key
         :return: 存在-True, 不存在-False
         """
+        start_time = time.time()
         conn = self.connect()
-        return conn.exists(key_name)
+        res = conn.exists(key_name)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'get_data_from_redis',
+                       'executeTime': (time.time() - start_time) * 1000})
+        return res
 
     def del_keys(self, key_name):
         """
@@ -63,11 +70,16 @@ class RedisHelper(object):
         :param key_name: key
         :return: data
         """
+        start_time = time.time()
         conn = self.connect()
         if not conn.exists(key_name):
             # key不存在
             return None
         data = conn.get(key_name)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'get_data_from_redis',
+                       'executeTime': (time.time() - start_time) * 1000})
         return data
 
     def set_data_to_redis(self, key_name, value, expire_time=24*3600):
@@ -78,8 +90,13 @@ class RedisHelper(object):
         :param expire_time: 过期时间,单位:s,默认1天
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.set(key_name, value, ex=int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'set_data_to_redis',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
         """
@@ -89,10 +106,15 @@ class RedisHelper(object):
         :param expire_time: 过期时间,单位:s,默认7天,type-int
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.zadd(key_name, data)
         # 设置过期时间
         conn.expire(key_name, int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'add_data_with_zset',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
         """
@@ -104,14 +126,21 @@ class RedisHelper(object):
         :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
         :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
         """
+        start_time = time.time()
         conn = self.connect()
         if not conn.exists(key_name):
             return None
         data = conn.zrange(key_name, start, end, desc, with_scores)
         if with_scores:
-            return data
+            data = data
         else:
-            return [eval(value) for value in data]
+            data = [eval(value) for value in data]
+
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'get_data_zset_with_index',
+                       'executeTime': (time.time() - start_time) * 1000})
+        return data
 
     def get_score_with_value(self, key_name, value):
         """
@@ -148,8 +177,13 @@ class RedisHelper(object):
         :param value: 元素的值
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.zrem(key_name, value)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'remove_value_from_zset',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def get_index_with_data(self, key_name, value):
         """
@@ -158,8 +192,14 @@ class RedisHelper(object):
         :param value: 元素的值
         :return: idx 位置索引
         """
+        start_time = time.time()
         conn = self.connect()
-        return conn.zrevrank(key_name, value)
+        res = conn.zrevrank(key_name, value)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'get_index_with_data',
+                       'executeTime': (time.time() - start_time) * 1000})
+        return res
 
     def get_data_from_set(self, key_name):
         """
@@ -167,6 +207,7 @@ class RedisHelper(object):
         :param key_name: key
         :return: data
         """
+        start_time = time.time()
         conn = self.connect()
         if not conn.exists(key_name):
             # key不存在
@@ -179,6 +220,11 @@ class RedisHelper(object):
             if cur == 0:
                 break
             cursor = cur
+
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'get_data_from_set',
+                       'executeTime': (time.time() - start_time) * 1000})
         return list(set(data))
 
     def add_data_with_set(self, key_name, values, expire_time=30*60):
@@ -189,10 +235,15 @@ class RedisHelper(object):
         :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.sadd(key_name, *values)
         # 设置过期时间
         conn.expire(key_name, int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'add_data_with_set',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def data_exists_with_set(self, key_name, value):
         """
@@ -201,8 +252,14 @@ class RedisHelper(object):
         :param value: 需判断的元素
         :return: 存在-True, 不存在-False
         """
+        start_time = time.time()
         conn = self.connect()
-        return conn.sismember(key_name, value)
+        res = conn.sismember(key_name, value)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'data_exists_with_set',
+                       'executeTime': (time.time() - start_time) * 1000})
+        return res
 
     def remove_value_from_set(self, key_name, values):
         """
@@ -211,8 +268,13 @@ class RedisHelper(object):
         :param values: 元素的值, 类型-tuple
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.srem(key_name, *values)
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'remove_value_from_set',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def decr_key(self, key_name, amount=1, expire_time=30*60):
         """
@@ -222,9 +284,14 @@ class RedisHelper(object):
         :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.decr(name=key_name, amount=amount)
         conn.expire(key_name, int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'decr_key',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def incr_key(self, key_name, amount=1, expire_time=30*60):
         """
@@ -234,9 +301,14 @@ class RedisHelper(object):
         :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
         :return: None
         """
+        start_time = time.time()
         conn = self.connect()
         conn.incr(name=key_name, amount=amount)
         conn.expire(key_name, int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'incr_key',
+                       'executeTime': (time.time() - start_time) * 1000})
 
     def setnx_key(self, key_name, value, expire_time=5*60):
         """
@@ -245,9 +317,14 @@ class RedisHelper(object):
         :param value: value
         :return: 过期时间,单位:s,默认5分钟 type-int
         """
+        start_time = time.time()
         conn = self.connect()
         conn.setnx(name=key_name, value=value)
         conn.expire(name=key_name, time=int(expire_time))
+        if self.params is not None:
+            log_.info({'request_id': self.params.get('request_id'),
+                       'operation': 'setnx_key',
+                       'executeTime': (time.time() - start_time) * 1000})
 
 
 #hologres_info = config_.HOLOGRES_INFO

+ 4 - 0
params_helper.py

@@ -0,0 +1,4 @@
+# 需多次使用参数类
+class Params(object):
+    def __init__(self, request_id):
+        self.request_id = request_id

+ 15 - 8
recommend.py

@@ -137,7 +137,8 @@ def positon_duplicate(pos1_vids, pos2_vids, videos):
 
 
 def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600,
-                    ab_code=config_.AB_CODE['initial'], rule_key='', no_op_flag=False, old_video_index=-1, video_id=None):
+                    ab_code=config_.AB_CODE['initial'], rule_key='', no_op_flag=False, old_video_index=-1, video_id=None,
+                    params=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -152,6 +153,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     :param expire_time: 末位视频记录redis过期时间
     :param ab_code: AB实验code
     :param video_id: 相关推荐头部视频id
+    :param params:
     :return:
     """
     # ####### 多进程召回
@@ -175,7 +177,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     recall_result_list = []
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
-                             client_info=client_info, rule_key=rule_key, no_op_flag=no_op_flag)
+                             client_info=client_info, rule_key=rule_key, no_op_flag=no_op_flag,
+                             params=params)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # 小时级实验
     if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
@@ -266,7 +269,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
         # 兜底策略
         # log_.info('====== bottom strategy')
         start_bottom = time.time()
-        rank_result = bottom_strategy(request_id=request_id, size=size, app_type=app_type, ab_code=ab_code)
+        rank_result = bottom_strategy(request_id=request_id, size=size, app_type=app_type, ab_code=ab_code, params=params)
         end_bottom = time.time()
         log_.info({'request_id': request_id,
                    'mid': mid,
@@ -565,7 +568,7 @@ def get_recommend_params(ab_exp_info, page_type=0):
     return top_K, flow_pool_P, ab_code, rule_key, expire_time, no_op_flag, old_video_index
 
 
-def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, client_info, ab_exp_info):
+def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, client_info, ab_exp_info, params):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -576,6 +579,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, cl
     :param algo_type: 算法类型  type-string
     :param client_info: 用户位置信息 {"country": "国家",  "province": "省份",  "city": "城市"}
     :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
+    :param params:
     :return:
     """
 
@@ -605,7 +609,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, cl
                                                            mid=mid, uid=uid, app_type=app_type,
                                                            size=size, top_K=top_K, flow_pool_P=flow_pool_P,
                                                            algo_type=algo_type, client_info=client_info,
-                                                           expire_time=12 * 3600)
+                                                           expire_time=12 * 3600, params=params)
         # ab-test
         # result = ab_test_op(rank_result=rank_result,
         #                     ab_code_list=[config_.AB_CODE['position_insert']],
@@ -633,7 +637,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, cl
                                                            algo_type=algo_type, client_info=client_info,
                                                            ab_code=ab_code, expire_time=expire_time,
                                                            rule_key=rule_key, no_op_flag=no_op_flag,
-                                                           old_video_index=old_video_index)
+                                                           old_video_index=old_video_index,
+                                                           params=params)
         log_.info({'request_id': request_id,
                    'app_type': app_type,
                    'mid': mid,
@@ -659,7 +664,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, cl
     return rank_result
 
 
-def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info, page_type):
+def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info, page_type, params):
     """
     相关推荐逻辑
     :param request_id: request_id
@@ -671,6 +676,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
     :param client_info: 地域参数
     :param page_type: 页面区分参数  1:详情页;2:分享页
+    :param params:
     :return: videos type-list
     """
     param_st = time.time()
@@ -691,7 +697,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                                        algo_type='', client_info=client_info,
                                                        ab_code=ab_code, expire_time=expire_time,
                                                        rule_key=rule_key, no_op_flag=no_op_flag,
-                                                       old_video_index=old_video_index, video_id=video_id)
+                                                       old_video_index=old_video_index, video_id=video_id,
+                                                       params=params)
     log_.info({'request_id': request_id,
                'app_type': app_type,
                'mid': mid,

+ 6 - 4
video_rank.py

@@ -152,18 +152,19 @@ def remove_duplicate(rov_recall, flow_recall, top_K):
     return rov_recall, flow_recall_result
 
 
-def bottom_strategy(request_id, size, app_type, ab_code):
+def bottom_strategy(request_id, size, app_type, ab_code, params):
     """
     兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
     :param request_id: request_id
     :param size: 需要获取的视频数
     :param app_type: 产品标识 type-int
     :param ab_code: abCode
+    :param params:
     :return:
     """
     pool_recall = PoolRecall(request_id=request_id, app_type=app_type, ab_code=ab_code)
     key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
-    redis_helper = RedisHelper()
+    redis_helper = RedisHelper(params=params)
     data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
     if not data:
         log_.info('{} —— ROV推荐进入了二次兜底, data = {}'.format(config_.ENV_TEXT, data))
@@ -185,15 +186,16 @@ def bottom_strategy(request_id, size, app_type, ab_code):
     return bottom_data
 
 
-def bottom_strategy_last(size, app_type, ab_code):
+def bottom_strategy_last(size, app_type, ab_code, params):
     """
     兜底策略: 从兜底视频中随机获取视频,进行状态过滤后的视频
     :param size: 需要获取的视频数
     :param app_type: 产品标识 type-int
     :param ab_code: abCode
+    :param params:
     :return:
     """
-    redis_helper = RedisHelper()
+    redis_helper = RedisHelper(params=params)
     bottom_data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
     random_data = numpy.random.choice(bottom_data, size * 30, False)
     # 视频状态过滤采用离线定时过滤方案

+ 11 - 10
video_recall.py

@@ -16,7 +16,7 @@ config_ = set_config()
 class PoolRecall(object):
     """召回"""
     def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
-                 rule_key='', no_op_flag=False):
+                 rule_key='', no_op_flag=False, params=None):
         """
         初始化
         :param request_id: request_id
@@ -25,6 +25,7 @@ class PoolRecall(object):
         :param mid: mid type-string
         :param uid: uid type-string
         :param ab_code: ab_code type-int
+        :param params:
         """
         self.request_id = request_id
         self.app_type = app_type
@@ -34,7 +35,7 @@ class PoolRecall(object):
         self.client_info = client_info
         self.rule_key = rule_key
         self.no_op_flag = no_op_flag
-        self.redis_helper = RedisHelper()
+        self.redis_helper = RedisHelper(params=params)
 
     def copy_redis_zset_data(self, from_key_name, to_key_name):
         # 获取from_key_name中的数据
@@ -536,7 +537,7 @@ class PoolRecall(object):
             #     value = '{}-{}'.format(item['videoId'], item['flowPool'])
             #     self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
 
-        redis_helper = RedisHelper()
+        # redis_helper = RedisHelper()
         for item in view_count_result:
             try:
                 # 接口超时,item[2]可能为None
@@ -550,7 +551,7 @@ class PoolRecall(object):
                 check_result.append(item)
                 # 将分发数更新到本地记录
                 key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item[0], item[1])
-                redis_helper.setnx_key(key_name=key_name, value=remain_count, expire_time=5 * 60)
+                self.redis_helper.setnx_key(key_name=key_name, value=remain_count, expire_time=5 * 60)
             else:
                 # viewCount <= 0
                 # 从流量召回池移除
@@ -660,8 +661,8 @@ class PoolRecall(object):
                 key_name = config_.UPDATE_ROV_KEY_NAME_APP
             else:
                 key_name = config_.UPDATE_ROV_KEY_NAME
-            redis_helper = RedisHelper()
-            data = redis_helper.get_data_zset_with_index(key_name=key_name,
+            # redis_helper = RedisHelper()
+            data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
                                                          start=0, end=-1, with_scores=True)
             # 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
             if data is None:
@@ -704,12 +705,12 @@ class PoolRecall(object):
         """
         try:
             # 获取生效中的置顶视频列表
-            redis_helper = RedisHelper()
+            # redis_helper = RedisHelper()
             if self.app_type == config_.APP_TYPE['APP']:
                 key_name = config_.TOP_VIDEO_LIST_KEY_NAME_APP
             else:
                 key_name = config_.TOP_VIDEO_LIST_KEY_NAME
-            data = redis_helper.get_data_from_redis(key_name=key_name)
+            data = self.redis_helper.get_data_from_redis(key_name=key_name)
             # log_.info('===1===  {}'.format(data))
             if data is None:
                 return [], []
@@ -1392,8 +1393,8 @@ class PoolRecall(object):
         push_from = config_.PUSH_FROM['top_video_relevant_appType_19']
         relevant_result = []
         relevant_videos_key_name = f"{config_.MOVIE_RELEVANT_LIST_KEY_NAME_PREFIX}{video_id}"
-        redis_helper = RedisHelper()
-        if not redis_helper.key_exists(key_name=relevant_videos_key_name):
+        # redis_helper = RedisHelper()
+        if not self.redis_helper.key_exists(key_name=relevant_videos_key_name):
             return relevant_result
         # 获取数据
         data = self.redis_helper.get_data_zset_with_index(key_name=relevant_videos_key_name, start=0, end=-1,