Преглед на файлове

Merge branch 'fea_20230808' of algorithm/rov-server into master

linfan преди 1 година
родител
ревизия
a9e880fc85
променени са 4 файла, в които са добавени 97 реда и са изтрити 60 реда
  1. 7 12
      rank_service.py
  2. 41 30
      recommend.py
  3. 48 17
      video_rank.py
  4. 1 1
      video_recall.py

+ 7 - 12
rank_service.py

@@ -8,7 +8,7 @@ log_ = Log()
 
 
 config_ = set_config()
-def get_featurs(mid, data, size, top_K, flow_pool_P, env_dict=None):
+def get_featurs(mid, data, size, top_K, flow_pool_P, rec_recall_vid_list, env_dict=None, video_static_info=None, video_hour_static_info=None):
     feature_dict = {}
     # defult value
     apptype = 4
@@ -73,13 +73,12 @@ def get_featurs(mid, data, size, top_K, flow_pool_P, env_dict=None):
     day_share_users = []
     recommendsource_list = []
     relevant_video_list = []
-    recall_list = env_dict.get('recall_list', [])
     city_list = []
     province_list = []
-    if recall_list and len(recall_list)>0:
-        for i in range(len(recall_list)):
+    if rec_recall_vid_list and len(rec_recall_vid_list)>0:
+        for i in range(len(rec_recall_vid_list)):
             mid_list.append(mid)
-            videoid_list.append(int(recall_list[i]))
+            videoid_list.append(rec_recall_vid_list[i])
             apptype_list.append(apptype)
             pagesource_list.append(pagesource)
             versioncode_list.append(versioncode)
@@ -91,9 +90,6 @@ def get_featurs(mid, data, size, top_K, flow_pool_P, env_dict=None):
             city_list.append(city_code)
             province_list.append(province_code)
             relevant_video_list.append(relevant_video_id)
-    video_static_info = env_dict.get('vid_day_fea_list', [])
-    video_hour_static_info = env_dict.get('vid_hour_fea_list', [])
-    #print("video_static_info:",video_static_info)
     if video_static_info and len(video_static_info)>0:
         for i in range(len(video_static_info)):
             try:
@@ -122,7 +118,7 @@ def get_featurs(mid, data, size, top_K, flow_pool_P, env_dict=None):
                                           day_share_pv_list, day_share_return_score_list, day_share_score_list,
                                           day_share_users, day_view_pv_list, day_view_users_list)
     else:
-        for i in range(len(recall_list)):
+        for i in range(len(rec_recall_vid_list)):
             insert_static_default_fea(day_ctr_score_list, day_play_pv_list, day_play_users_list,
                                       day_return_rate_list, day_return_users_list, day_rov_list,
                                       day_share_pv_list, day_share_return_score_list, day_share_score_list,
@@ -156,7 +152,7 @@ def get_featurs(mid, data, size, top_K, flow_pool_P, env_dict=None):
                     hour_return_rate_list.append(0.0)
                     hour_ctr_score_list.append(0.0)
     else:
-        for i in range(len(recall_list)):
+        for i in range(len(rec_recall_vid_list)):
             hour_rov_list.append(0.0)
             hour_share_return_score_list.append(0.0)
             hour_share_score_list.append(0.0)
@@ -254,8 +250,7 @@ def get_tf_serving_sores(feature_dict):
     #print(request_data)
     # 调用http接口
     result = request_post_data(config_.TF_SERVING_URL,request_data, timeout=(0.1, 1))
-
-    # print("result:", result)
+    #print("result:", result)
     if result is None:
         print("result is None")
         log_.info('call tf serving error,types: {}')

+ 41 - 30
recommend.py

@@ -624,29 +624,22 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 'flow_pool_recall': recall_result_list[2]
             }
     # 3. 特征回流
-    rec_recall_list = []
-    vidKeys = []
-    hour_vidKeys = []
-    pre_str = "v_ctr:"
-    pre_hour_str = "v_hour_ctr:"
-    rec_recall_item_list = []
-    for recall_item in data['rov_pool_recall']:
-        if len(recall_item) <= 0:
-            continue
-        vid = recall_item.get("videoId", 0)
-        rec_recall_list.append(vid)
-        vidKeys.append(pre_str + str(vid))
-        hour_vidKeys.append(pre_hour_str + str(vid))
-        rec_recall_item_list.append(recall_item)
-    redisObj = RedisHelper()
-    video_static_info = redisObj.get_batch_key(vidKeys)
-    video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
-    vid_day_fea_list = []
-    vid_hour_fea_list = []
-    if video_static_info:
-        vid_day_fea_list = video_static_info
-    if video_hour_static_info:
-        vid_hour_fea_list = video_hour_static_info
+    #
+    # for recall_item in data['rov_pool_recall']:
+    #     if len(recall_item) <= 0:
+    #         continue
+    #     vid = recall_item.get("videoId", 0)
+    #     rec_recall_list.append(vid)
+    #     rec_recall_item_list.append(recall_item)
+    # redisObj = RedisHelper()
+    # video_static_info = redisObj.get_batch_key(vidKeys)
+    # video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
+    # vid_day_fea_list = []
+    # vid_hour_fea_list = []
+    # if video_static_info:
+    #     vid_day_fea_list = video_static_info
+    # if video_hour_static_info:
+    #     vid_hour_fea_list = video_hour_static_info
     if env_dict:
         province_code = client_info.get('provinceCode', -1)
         if province_code and province_code == "":
@@ -657,21 +650,39 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         env_dict['mid'] = mid
         env_dict['province_code'] = province_code
         env_dict['city_code'] = city_code
-
-        env_dict['recall_list'] = rec_recall_list
-        env_dict['vid_day_fea_list'] = vid_day_fea_list
-        env_dict['vid_hour_fea_list'] = vid_hour_fea_list
         env_json = env_dict
     #4.
-    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict, rec_recall_item_list=rec_recall_item_list)
+    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
     #print(rank_result)
     if rank_result:
         result['rank_num'] = len(rank_result)
+        day_vidKeys = []
+        hour_vidKeys = []
+        rec_recall_list = []
+        pre_str = "v_ctr:"
+        pre_hour_str = "v_hour_ctr:"
+        if env_dict and len(rank_result)>0:
+            for rec_item in rank_result:
+                vid = rec_item.get("videoId", 0)
+                rec_recall_list.append(vid)
+                day_vidKeys.append(pre_str+str(vid))
+                hour_vidKeys.append(pre_hour_str+str(vid))
+            redisObj = RedisHelper()
+            video_static_info = redisObj.get_batch_key(day_vidKeys)
+            video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
+            vid_day_fea_list = []
+            vid_hour_fea_list = []
+            if video_static_info:
+                 vid_day_fea_list = video_static_info
+            if video_hour_static_info:
+                 vid_hour_fea_list = video_hour_static_info
+            env_dict['recall_list'] = rec_recall_list
+            env_dict['vid_day_fea_list'] = vid_day_fea_list
+            env_dict['vid_hour_fea_list'] = vid_hour_fea_list
+            env_json = env_dict
     result['rankResult'] = rank_result
     result['flow_num'] = flow_num
     result['rankTime'] = (time.time() - start_rank) * 1000
-
-
     return result, env_json
     # return rank_result, last_rov_recall_key
 

+ 48 - 17
video_rank.py

@@ -593,7 +593,7 @@ def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_vi
     return new_rank_result[:size]
 
 
-def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=None, env_dict=None, rec_recall_item_list=None):
+def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=None, env_dict=None):
     """
         视频分发排序
         :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -602,14 +602,14 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=Non
         :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
         :return: rank_result
         """
-    if not rec_recall_item_list and not data['flow_pool_recall']:
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
         return [], 0
 
     #全量的是vlog,票圈精选, 334,60057,
     # 60054: simrecall,
     pre_str = "k_p2:"
-    #print("pre_str:", pre_str)
-    rov_recall_rank = rec_recall_item_list
+    rov_recall_rank = data['rov_pool_recall']
+    #print(rov_recall_rank)
     #call rank service
     #flag_call_service = 0
     sort_index = 0
@@ -618,32 +618,63 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=Non
     #print("sort_index:", sort_index)
     redisObj = RedisHelper()
     vidKeys = []
-    for recall_item in rec_recall_item_list:
-        vid = recall_item.get("videoId", 0)
-        vidKeys.append(pre_str + str(vid))
+    rec_recall_item_list = []
+    rec_recall_vid_list = []
+    day_vidKeys = []
+    hour_vidKeys = []
+    pre_day_str = "v_ctr:"
+    pre_hour_str = "v_hour_ctr:"
+    for recall_item in data['rov_pool_recall']:
+        try:
+            vid = int(recall_item.get("videoId", 0))
+            rec_recall_vid_list.append(vid)
+            rec_recall_item_list.append(recall_item)
+            vidKeys.append(pre_str + str(vid))
+            day_vidKeys.append(pre_day_str+str(vid))
+            hour_vidKeys.append(pre_hour_str+str(vid))
+        except:
+            continue
     video_scores = redisObj.get_batch_key(vidKeys)
-    if ab_code == 60066 or ab_code == 60069 or ab_code == 60070 or ab_code == 60071:
-        feature_dict = get_featurs(mid, data, size, top_K, flow_pool_P, env_dict)
+    #print("video_scores:", video_scores)
+    if (ab_code == 60066 or ab_code == 60069 or ab_code == 60070 or ab_code == 60071) and len(rec_recall_vid_list)>0:
+        video_static_info = redisObj.get_batch_key(day_vidKeys)
+        video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
+        #print("env_dict:", env_dict)
+        feature_dict = get_featurs(mid, data, size, top_K, flow_pool_P, rec_recall_vid_list,env_dict, video_static_info, video_hour_static_info)
         score_result = get_tf_serving_sores(feature_dict)
+        #print("score_result:", score_result)
         if video_scores and len(video_scores)>0  and rec_recall_item_list and score_result and len(score_result) > 0\
                 and len(score_result) == len(rec_recall_item_list) and len(video_scores)== len(score_result):
             for i in range(len(score_result)):
                 try:
                     if video_scores[i] is None and len(score_result[i])>0:
                         return_score = 0.000000001
-                        total_score = return_score * score_result[i][0]
+                        # sore_index :10 = model score
+                        if sort_index == 10:
+                            total_score = score_result[i][0]
+                        else:
+                            total_score = return_score * score_result[i][0]
                         rec_recall_item_list[i]['sort_score'] = total_score
                     else:
                         video_score_str = json.loads(video_scores[i])
-                        if len(video_score_str)>= sort_index and  len(video_score_str)>0:
-                            return_score = video_score_str[sort_index]
+                        # sore_index :10 = model score
+                        if sort_index == 10:
+                            total_score = score_result[i][0]
                         else:
-                            return_score = 0.000000001
-                        total_score = return_score * score_result[i][0]
+                            return_score  = 0.000000001
+                            if len(video_score_str)>= sort_index and  len(video_score_str)>0:
+                                return_score = video_score_str[sort_index]
+                            total_score = return_score * score_result[i][0]
+                            #print("total_score:", total_score, " model score :", score_result[i][0], "return_score:",
+                             #     return_score)
                         rec_recall_item_list[i]['sort_score'] = total_score
-                except Exception:
-                    return_score = 0.000000001
-                    total_score = return_score * 0.00000001
+                except Exception as e:
+                    #print('exception: {}:', e)
+                    if sort_index == 10:
+                        total_score = 0.00000001
+                    else:
+                        return_score = 0.000000001
+                        total_score = return_score * 0.00000001
                     rec_recall_item_list[i]['sort_score'] = total_score
                 rec_recall_item_list[i]['flag_call_service'] = 1
             rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)

+ 1 - 1
video_recall.py

@@ -2694,7 +2694,7 @@ class PoolRecall(object):
             return None
 
     def get_sort_ab_codel_config(self):
-        ab_key = "sort_ab_config"
+        ab_key = "sort_ab_config2"
         data = self.redis_helper.get_data_from_redis(key_name=ab_key)
         if data is not None:
             try: