zhangbo před 1 rokem
rodič
revize
3084694b01
2 změnil soubory, kde provedl 61 přidání a 99 odebrání
  1. 50 26
      video_rank.py
  2. 11 73
      video_recall.py

+ 50 - 26
video_rank.py

@@ -894,16 +894,20 @@ def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='ra
         :param rank_key_prefix:
         :return: rank_result
     """
-    # 1 读取多样性密度控制规则
+    # 1 读取多样性密度控制规则 todo 没有判断非字典数据,有报错风险,先用try兜底。
     redis_helper = RedisHelper()
     density_rules = {}
     rules_all = param_update_rule(redis_helper)
-    if len(rules_all) != 0:
-        for k, v in rules_all.items():
-            if str(app_type) in v.keys():
-                app_type_rule = v[str(app_type)]
-                if "density" in app_type_rule.keys():
-                    density_rules[k] = app_type_rule["density"]
+    try:
+        if len(rules_all) != 0:
+            for k, v in rules_all.items():
+                if str(app_type) in v.keys():
+                    app_type_rule = v[str(app_type)]
+                    if "density" in app_type_rule.keys():
+                        density_rules[k] = app_type_rule["density"]
+    except Exception as e:
+        log_.error("something is wrong in parsing density_rules:{}".format(e))
+        density_rules = {}
 
     if flow_pool_recall_process is None:
         flow_pool_recall_process = {}
@@ -938,8 +942,18 @@ def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='ra
     rov_recall_rank, flow_recall_rank = remove_duplicate(
         rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
     )
-    rank_result = []
+    # 2 多样性需求,给video添加tag todo
+    video_ids = []
+    video_ids.extend([v["videoId"] for v in rov_recall_rank])
+    video_ids.extend([v["videoId"] for v in flow_recall_rank])
+    video_ids = list(set(video_ids))
+    video_tag_dict = get_video_tags(redis_helper, video_ids)
+    for v in rov_recall_rank:
+        v["tags"] = video_tag_dict.get(v, [])
+    for v in flow_recall_rank:
+        v["tags"] = video_tag_dict.get(v, [])
 
+    rank_result = []
     flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
                                                         'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
     # 从ROV召回池中获取top k
@@ -956,9 +970,6 @@ def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='ra
     # print("zb-2:" + str([i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
     # print("zb-3:" + str([i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
 
-
-
-
     while i < size - top_K:
         # 随机生成[0, 1)浮点数
         rand = random.random()
@@ -1468,8 +1479,9 @@ def merge_density_control(
             if len(judge_rule_set & tags) != 0:
                 continue
             # 开始插入
+            tmp = copy.deepcopy(data[index])
             data[index] = d
-            candidate.pop(i)
+            candidate[i] = tmp
             # 更新状态
             for tag in tags:
                 status_cur[tag] = status_cur[tag] + 1 if tag in status_cur.keys() else 1
@@ -1485,27 +1497,39 @@ def judge_rule(rule: dict, status: dict) -> Set:
         if k in rule.keys() and rule[k] <= v:
             result.add(k)
     return result
+def get_video_tags(redis_helper, video_ids) -> dict:
+    REDIS_PREFIX = "alg_recsys_video_tags_"
+    redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
+    video_tags = redis_helper.get_batch_key(redis_keys)
+    video_tag_dict = {}
+    if video_tags is not None:
+        for i, tags_str in enumerate(video_tags):
+            tags = []
+            if tags_str is not None and len(tags_str) != 0:
+                tags = str(tags_str).split(",")
+            video_tag_dict[video_ids[i]] = tags
+    return video_tag_dict
 
 if __name__ == '__main__':
     data: list = [
-        {'videoId': 1, 'pushFrom': 'rov', 'tags': ['下午好','元旦','祝福']},
-        {'videoId': 2, 'pushFrom': 'rov', 'tags': ['下午好','祝福']},
-        {'videoId': 3, 'pushFrom': 'rov', 'tags': ['早上好']},
-        {'videoId': 4, 'pushFrom': 'flow', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 1, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 2, 'flowPool': '', 'tags': ['下午好','祝福']},
+        {'videoId': 3, 'flowPool': '', 'tags': ['早上好']},
+        {'videoId': 4, 'flowPool': 'flow', 'tags': ['下午好','元旦','祝福']},
     ]
     rov = [
-        {'videoId': 10, 'pushFrom': 'rov', 'tags': ['下午好']},
-        {'videoId': 11, 'pushFrom': 'rov', 'tags': ['祝福']},
-        {'videoId': 12, 'pushFrom': 'rov', 'tags': ['下午好','元旦','祝福']},
-        # {'videoId': 13, 'pushFrom': 'rov', 'tags': ['元旦']},
-        # {'videoId': 14, 'pushFrom': 'rov', 'tags': []},
-        # {'videoId': 15, 'pushFrom': 'rov', 'tags': []}
+        {'videoId': 10, 'flowPool': '', 'tags': ['下午好']},
+        {'videoId': 11, 'flowPool': '', 'tags': ['祝福']},
+        {'videoId': 12, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 13, 'flowPool': '', 'tags': ['元旦', "下午好"]},
+        # {'videoId': 14, 'flowPool': '', 'tags': []},
+        # {'videoId': 15, 'flowPool': '', 'tags': []}
     ]
     flow = [
-        {'videoId': 20, 'pushFrom': 'flow', 'tags': ['下午好']},
-        {'videoId': 21, 'pushFrom': 'flow', 'tags': ['下午好']},
-        {'videoId': 22, 'pushFrom': 'flow', 'tags': []},
-        {'videoId': 23, 'pushFrom': 'flow', 'tags': []}
+        {'videoId': 20, 'flowPool': 'flow', 'tags': ['下午好']},
+        {'videoId': 21, 'flowPool': 'flow', 'tags': ['下午好']},
+        {'videoId': 22, 'flowPool': 'flow', 'tags': []},
+        {'videoId': 23, 'flowPool': 'flow', 'tags': []}
     ]
     rule = {
         '下午好': 2,

+ 11 - 73
video_recall.py

@@ -11,10 +11,9 @@ import gevent
 import json
 import sys
 from parameter_update import param_update_expansion_factor
-from parameter_update import param_update_filter_flags
+from parameter_update import param_update_risk_filter_flag
 from parameter_update import param_update_risk_rule
 from parameter_update import param_update_risk_videos
-from parameter_update import param_update_rule
 
 log_ = Log()
 config_ = set_config()
@@ -53,17 +52,14 @@ class PoolRecall(object):
         self.h_rule_key = h_rule_key
 
         self.expansion_factor = param_update_expansion_factor()
-        [self.risk_filter_flag, self.tags_filter_flag] = param_update_filter_flags()
+        self.risk_filter_flag = param_update_risk_filter_flag()
         if self.risk_filter_flag:
             self.app_region_filtered = param_update_risk_rule()
             self.videos_with_risk = param_update_risk_videos()
         else:
             self.app_region_filtered = {}
             self.videos_with_risk = []
-        if self.tags_filter_flag:
-            self.tags_filter_rule = param_update_rule(self.redis_helper)
-        else:
-            self.tags_filter_rule = {}
+
 
 
     def copy_redis_zset_data(self, from_key_name, to_key_name):
@@ -534,8 +530,6 @@ class PoolRecall(object):
                     video_mapping[video_id] = [flow_pool]
                 else:
                     video_mapping[video_id].append(flow_pool)
-            # todo zhangbo 获取tags
-            video_tag_dict = self.get_video_tags(video_ids)
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
@@ -581,8 +575,7 @@ class PoolRecall(object):
                         flow_pool_recall_result.append(
                             {'videoId': video_id, 'flowPool': flow_pool,
                              'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
-                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group,
-                             "tags": video_tag_dict[video_id]}
+                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
                         )
 
                         flow_pool_recall_videos.append(video_id)
@@ -798,8 +791,6 @@ class PoolRecall(object):
                     video_mapping[video_id] = [flow_pool]
                 else:
                     video_mapping[video_id].append(flow_pool)
-            # todo zhangbo 获取tags
-            video_tag_dict = self.get_video_tags(video_ids)
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
@@ -845,8 +836,7 @@ class PoolRecall(object):
                         flow_pool_recall_result.append(
                             {'videoId': video_id, 'flowPool': flow_pool, 'level': level,
                              'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
-                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group,
-                             "tags": video_tag_dict[video_id]}
+                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
                         )
 
                         flow_pool_recall_videos.append(video_id)
@@ -1096,8 +1086,6 @@ class PoolRecall(object):
                     check_result_items.append([video_id, flow_pool, score])
             check_result_items = sorted(check_result_items, key=lambda x: x[2], reverse=True)
             to_filter_videos = [item[0] for item in check_result_items[:get_size]]
-            # todo zhangbo 获取tags
-            video_tag_dict = self.get_video_tags(to_filter_videos)
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos,
@@ -1121,8 +1109,7 @@ class PoolRecall(object):
                         {'videoId': video_id, 'flowPool': check_result_mapping[video_id][0], 'level': level,
                          'rovScore': check_result_mapping[video_id][1],
                          'pushFrom': config_.PUSH_FROM['flow_recall'],
-                         'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group,
-                         "tags": video_tag_dict[video_id]}
+                         'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
                     )
 
         return flow_pool_recall_result[:size], flow_pool_recall_process
@@ -2060,11 +2047,7 @@ class PoolRecall(object):
             ]
         else:
             t = [
-                #add recall video
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+               gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
@@ -2856,10 +2839,6 @@ class PoolRecall(object):
                 video_id = int(value[0])
                 video_ids.append(video_id)
                 video_score[video_id] = value[1]
-
-            # todo zhangbo 增加tags特征
-            video_tag_dict = self.get_video_tags(video_ids)
-
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
@@ -2876,8 +2855,7 @@ class PoolRecall(object):
             if filtered_result:
                 # 添加视频源参数 pushFrom, abCode
                 temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
-                                'pushFrom': push_from, 'abCode': self.ab_code,
-                                'tags': video_tag_dict[item]}
+                                'pushFrom': push_from, 'abCode': self.ab_code}
                                for item in filtered_result if video_score.get(int(item)) is not None]
                 pool_recall_result.extend(temp_result)
             # else:
@@ -3152,18 +3130,6 @@ class PoolRecall(object):
         #print("recall_key:", recall_key)
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
 
-        # todo zhangbo 获取tags
-        video_ids_4tags = []
-        if data is not None:
-            json_result = json.loads(data)
-            for per_item in json_result:
-                try:
-                    vid = int(per_item[0])
-                    video_ids_4tags.append(vid)
-                except Exception as e:
-                    continue
-        video_tag_dict = self.get_video_tags(video_ids_4tags)
-
         #print(data)
         recall_result = []
         recall_dict  = {}
@@ -3177,7 +3143,7 @@ class PoolRecall(object):
                     video_ids.append(vid)
                     recall_dict[vid] = {'videoId': vid, 'flowPool': '',
                          'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
-                         'abCode': self.ab_code, "tags": video_tag_dict[vid]}
+                         'abCode': self.ab_code}
                 except Exception as e:
                     continue
         if len(video_ids)<=0:
@@ -3849,18 +3815,6 @@ class PoolRecall(object):
         #print("recall_key:", recall_key)
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
 
-        # todo zhangbo 获取tags
-        video_ids_4tag = []
-        if data is not None and data!="" :
-            try:
-                json_result = json.loads(data)
-                for per_item in json_result:
-                    vid = int(per_item[0])
-                    video_ids_4tag.append(vid)
-            except Exception as e:
-                video_ids_4tag = []
-        video_tag_dict = self.get_video_tags(video_ids_4tag)
-
         #print(data)
         recall_result = []
         recall_dict  = {}
@@ -3873,7 +3827,7 @@ class PoolRecall(object):
                     video_ids.append(vid)
                     recall_dict[vid] = {'videoId': vid, 'flowPool': '',
                          'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['return_video_recall'],
-                         'abCode': self.ab_code, "tags": video_tag_dict[vid]}
+                         'abCode': self.ab_code}
             except Exception as e:
                 return recall_result
         if len(video_ids)<=0:
@@ -3992,8 +3946,6 @@ class PoolRecall(object):
             data_for_filter = [i for i in data_for_filter if len(i) > 0]
             data_for_filter.append([17736990, 17734880, 17734759, 17726977])
             data1_list.extend([17736990, 17734880, 17734759, 17726977])
-            # 3.9 获取item的tag特征
-            video_tag_dict = self.get_video_tags(list(set(data1_list) | set(data2_list)))
             # 4 视频过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=None,
@@ -4003,8 +3955,7 @@ class PoolRecall(object):
                                    videos_with_risk=self.videos_with_risk
                                    )
             region_code = self.get_region_code()
-            t = [gevent.spawn(filter_.filter_videos_for_group, region_code, videos, video_tag_dict,
-                              self.tags_filter_rule, self.tags_filter_flag) for videos in data_for_filter]
+            t = [gevent.spawn(filter_.filter_videos_for_group, region_code, videos) for videos in data_for_filter]
             gevent.joinall(t)
             result_list = [i.get() for i in t if i.get() is not None and len(i.get()) > 0]
             # 5 返回结果
@@ -4012,25 +3963,12 @@ class PoolRecall(object):
             for g in result_list:
                 for v in g:
                     results.append({
-                        'tags': video_tag_dict[v],
                         'videoId': v, 'flowPool': '',
                         'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['recall_strategy_trend_v1'],
                         'abCode': self.ab_code
                     })
-            # print("results-trend:" + str(results))
             return results
         except Exception as e:
             log_.error("error in recall_strategy_trend_v1:{}".format(e))
         return []
-    def get_video_tags(self, video_ids) -> dict:
-        REDIS_PREFIX = "alg_recsys_video_tags_"
-        redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
-        video_tags = self.redis_helper.get_batch_key(redis_keys)
-        video_tag_dict = {}
-        for i, tags_str in enumerate(video_tags):
-            tags = []
-            if tags_str is not None and len(tags_str) != 0:
-                tags = str(tags_str).split(",")
-            video_tag_dict[video_ids[i]] = tags
-        return video_tag_dict