zhangbo 1 anno fa
parent
commit
a939be9954
4 ha cambiato i file con 267 aggiunte e 72 eliminazioni
  1. 13 1
      parameter_update.py
  2. 21 27
      recommend.py
  3. 225 22
      video_rank.py
  4. 8 22
      video_recall.py

+ 13 - 1
parameter_update.py

@@ -15,6 +15,8 @@ RISK_SHIELD_FILTER_FLAG_BOOL = "RISK_SHIELD_FILTER_FLAG_BOOL"
 TAGS_FILTER_FLAG_BOOL = "TAGS_FILTER_FLAG_BOOL"
 TAGS_FILTER_FLAG_BOOL = "TAGS_FILTER_FLAG_BOOL"
 TAGS_FILTER_RULE_V1_JSON = "TAGS_FILTER_RULE_V1_JSON"
 TAGS_FILTER_RULE_V1_JSON = "TAGS_FILTER_RULE_V1_JSON"
 
 
+ALG_RECSYS_RANK_DENSITY_RULES_JSON = "ALG_RECSYS_RANK_DENSITY_RULES_JSON"
+
 def param_update_risk_rule() -> dict:
 def param_update_risk_rule() -> dict:
     """
     """
     定时更新风险过滤的规则
     定时更新风险过滤的规则
@@ -104,7 +106,7 @@ def param_update_filter_flags() -> [bool, bool]:
             data2 = False
             data2 = False
     return [data1, data2]
     return [data1, data2]
 
 
-def param_update_rule(redis_helper: RedisHelper):
+def param_update_rule(redis_helper: RedisHelper) -> dict:
     tmp = redis_helper.get_data_from_redis(key_name=TAGS_FILTER_RULE_V1_JSON)
     tmp = redis_helper.get_data_from_redis(key_name=TAGS_FILTER_RULE_V1_JSON)
     if tmp is not None:
     if tmp is not None:
         try:
         try:
@@ -113,6 +115,16 @@ def param_update_rule(redis_helper: RedisHelper):
         except Exception as e:
         except Exception as e:
             log_.error("{}: parse json is wrong with in param_update_rule:{}".format(e, tmp))
             log_.error("{}: parse json is wrong with in param_update_rule:{}".format(e, tmp))
     return {}
     return {}
+
+def param_update_density_rule(redis_helper: RedisHelper):
+    tmp = redis_helper.get_data_from_redis(key_name=ALG_RECSYS_RANK_DENSITY_RULES_JSON)
+    if tmp is not None:
+        try:
+            data = json.loads(tmp)
+            return data
+        except Exception as e:
+            log_.error("{}: parse json is wrong with in param_update_density_rule:{}".format(e, tmp))
+    return {}
 if __name__ == '__main__':
 if __name__ == '__main__':
     pass
     pass
     d1 = param_update_risk_rule()
     d1 = param_update_risk_rule()

+ 21 - 27
recommend.py

@@ -13,7 +13,7 @@ from log import Log
 from config import set_config
 from config import set_config
 from video_recall import PoolRecall
 from video_recall import PoolRecall
 from video_rank import video_new_rank2, video_sank_pos_rank, video_new_rank, video_rank, refactor_video_rank, \
 from video_rank import video_new_rank2, video_sank_pos_rank, video_new_rank, video_rank, refactor_video_rank, \
-    bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3
+    bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3, video_new_rank3_4density
 from db_helper import RedisHelper
 from db_helper import RedisHelper
 import gevent
 import gevent
 from utils import FilterVideos, get_user_has30day_return
 from utils import FilterVideos, get_user_has30day_return
@@ -111,7 +111,7 @@ def video_position_recommend(request_id, mid, uid, app_type, videos):
     pos1_vids = filted_list[0]
     pos1_vids = filted_list[0]
     pos2_vids = filted_list[1]
     pos2_vids = filted_list[1]
 
 
-    videos = positon_duplicate(pos1_vids, pos2_vids, videos)    
+    videos = positon_duplicate(pos1_vids, pos2_vids, videos)
 
 
     if pos1_vids is not None and len(pos1_vids) >0 :
     if pos1_vids is not None and len(pos1_vids) >0 :
         videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
         videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
@@ -718,10 +718,13 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     # add_flow_pool_recall_log
     # add_flow_pool_recall_log
     recall_result_list = copy.deepcopy(recall_result_list)
     recall_result_list = copy.deepcopy(recall_result_list)
     flow_pool_recall_process = {}
     flow_pool_recall_process = {}
-    # print("zb:" + str(rov_pool_recall))
-    # ####### 排序
+
+    # ------------------排------------------
+    # ------------------序------------------
+    # ------------------逻------------------
+    # ------------------辑------------------
+
     start_rank = time.time()
     start_rank = time.time()
-    # log_.info('====== rank')
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
         if ab_code in [
         if ab_code in [
             config_.AB_CODE['rov_rank_appType_18_19'],
             config_.AB_CODE['rov_rank_appType_18_19'],
@@ -730,9 +733,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         ]:
         ]:
             data = {
             data = {
                 'rov_pool_recall': recall_result_list[0],
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[1][0]
                 'flow_pool_recall': recall_result_list[1][0]
-                # 'flow_pool_recall': recall_result_list[1]
             }
             }
         else:
         else:
             data = {
             data = {
@@ -740,7 +741,6 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 'flow_pool_recall': []
                 'flow_pool_recall': []
             }
             }
     else:
     else:
-        # add_flow_pool_recall_log
         if recall_result_list[1][0]:
         if recall_result_list[1][0]:
             redis_helper = RedisHelper()
             redis_helper = RedisHelper()
             quick_flow_pool_P = redis_helper.get_data_from_redis(
             quick_flow_pool_P = redis_helper.get_data_from_redis(
@@ -750,20 +750,14 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 flow_pool_P = quick_flow_pool_P
                 flow_pool_P = quick_flow_pool_P
             data = {
             data = {
                 'rov_pool_recall': recall_result_list[0],
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[1][0]
                 'flow_pool_recall': recall_result_list[1][0]
-                # 'flow_pool_recall': recall_result_list[1]
             }
             }
-            # add_flow_pool_recall_log
             flow_pool_recall_process = recall_result_list[1][1].copy()
             flow_pool_recall_process = recall_result_list[1][1].copy()
         else:
         else:
             data = {
             data = {
                 'rov_pool_recall': recall_result_list[0],
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[2][0]
                 'flow_pool_recall': recall_result_list[2][0]
-                # 'flow_pool_recall': recall_result_list[2]
             }
             }
-            # add_flow_pool_recall_log
             flow_pool_recall_process = recall_result_list[2][1]
             flow_pool_recall_process = recall_result_list[2][1]
 
 
     # 3. 特征回流
     # 3. 特征回流
@@ -797,16 +791,16 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     #4.
     #4.
     # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
     # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
 
 
-    # rank_result, flow_num = video_new_rank3(
-    #     data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
-    # )
-
-    # add_flow_pool_recall_log
-
-    rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
-        data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
-        flow_pool_recall_process=flow_pool_recall_process
-    )
+    if ab_code == 60098:
+        rank_result, flow_num, flow_pool_recall_process = video_new_rank3_4density(
+            data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
+            flow_pool_recall_process=flow_pool_recall_process, app_type=app_type
+        )
+    else:
+        rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
+            data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
+            flow_pool_recall_process=flow_pool_recall_process
+        )
 
 
     #print(rank_result)
     #print(rank_result)
     if rank_result:
     if rank_result:
@@ -887,7 +881,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         region_code = province_code
         region_code = province_code
     if region_code == '':
     if region_code == '':
         region_code = '-1'
         region_code = '-1'
-    
+
     #print("region_code:", region_code)
     #print("region_code:", region_code)
 
 
     #size =1000
     #size =1000
@@ -1013,7 +1007,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             #print(pushFrom, rank_id)
             #print(pushFrom, rank_id)
             flowPoolFlag = ''
             flowPoolFlag = ''
             if rank_id in add_flow_set:
             if rank_id in add_flow_set:
-                flowPoolFlag = flowFlag_dict.get(rank_id,'') 
+                flowPoolFlag = flowFlag_dict.get(rank_id,'')
             rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag,
             rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag,
                      'rovScore': rank_score, 'pushFrom': pushFrom,
                      'rovScore': rank_score, 'pushFrom': pushFrom,
                      'abCode': ab_code})
                      'abCode': ab_code})
@@ -1657,7 +1651,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
             #         data_key = religion_param.get('data_key')
             #         data_key = religion_param.get('data_key')
             #         rule_key_30day = religion_param.get('30day_rule_key')
             #         rule_key_30day = religion_param.get('30day_rule_key')
 
 
-            
+
             for code, param in config_.AB_EXP_CODE.items():
             for code, param in config_.AB_EXP_CODE.items():
                 if code in ab_exp_code_list:
                 if code in ab_exp_code_list:
                     ab_code = param.get('ab_code')
                     ab_code = param.get('ab_code')

+ 225 - 22
video_rank.py

@@ -2,6 +2,8 @@ import copy
 import json
 import json
 import random
 import random
 import numpy
 import numpy
+from typing import Dict
+from typing import Set
 
 
 from log import Log
 from log import Log
 from config import set_config
 from config import set_config
@@ -9,6 +11,7 @@ from video_recall import PoolRecall
 from db_helper import RedisHelper
 from db_helper import RedisHelper
 from utils import FilterVideos, send_msg_to_feishu
 from utils import FilterVideos, send_msg_to_feishu
 from  rank_service import get_featurs, get_tf_serving_sores
 from  rank_service import get_featurs, get_tf_serving_sores
+from parameter_update import param_update_rule
 
 
 log_ = Log()
 log_ = Log()
 config_ = set_config()
 config_ = set_config()
@@ -809,14 +812,11 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     """
     """
     redis_helper = RedisHelper()
     redis_helper = RedisHelper()
 
 
-    # add_flow_pool_recall_log
     if flow_pool_recall_process is None:
     if flow_pool_recall_process is None:
         flow_pool_recall_process = {}
         flow_pool_recall_process = {}
 
 
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-        # add_flow_pool_recall_log
         return [], 0, flow_pool_recall_process
         return [], 0, flow_pool_recall_process
-        # return [], 0
 
 
     rov_recall_rank = data['rov_pool_recall']
     rov_recall_rank = data['rov_pool_recall']
     vid_keys = []
     vid_keys = []
@@ -847,7 +847,6 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     )
     )
     rank_result = []
     rank_result = []
 
 
-    # add_flow_pool_recall_log
     flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
     flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
                                                         'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
                                                         'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
 
 
@@ -864,12 +863,8 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     while i < size - top_K:
     while i < size - top_K:
         # 随机生成[0, 1)浮点数
         # 随机生成[0, 1)浮点数
         rand = random.random()
         rand = random.random()
-
-        # add_flow_pool_recall_log
         flow_pool_recall_process['flow_pool_P'] = flow_pool_P
         flow_pool_recall_process['flow_pool_P'] = flow_pool_P
         flow_pool_recall_process[f'{i}_rand'] = rand
         flow_pool_recall_process[f'{i}_rand'] = rand
-
-        # log_.info('rand: {}'.format(rand))
         if rand < flow_pool_P:
         if rand < flow_pool_P:
             if flow_recall_rank:
             if flow_recall_rank:
                 rank_result.append(flow_recall_rank[0])
                 rank_result.append(flow_recall_rank[0])
@@ -887,7 +882,121 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
         i += 1
         i += 1
     return rank_result[:size], flow_num, flow_pool_recall_process
     return rank_result[:size], flow_num, flow_pool_recall_process
 
 
+def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:',
+                             flow_pool_recall_process=None,
+                             app_type=None):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :param rank_key_prefix:
+        :return: rank_result
+    """
+    redis_helper = RedisHelper()
+    density_rules = {}
+    rules_all = param_update_rule(redis_helper)
+    if len(rules_all) != 0:
+        for k, v in rules_all.items():
+            if str(app_type) in v.keys():
+                app_type_rule = v[str(app_type)]
+                if "density" in app_type_rule.keys():
+                    density_rules[k] = app_type_rule["density"]
+
+    if flow_pool_recall_process is None:
+        flow_pool_recall_process = {}
+
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return [], 0, flow_pool_recall_process
+
+    rov_recall_rank = data['rov_pool_recall']
+    vid_keys = []
+    rec_recall_item_list = []
+    rec_recall_vid_list = []
+    for recall_item in data['rov_pool_recall']:
+        try:
+            vid = int(recall_item.get("videoId", 0))
+            rec_recall_vid_list.append(vid)
+            rec_recall_item_list.append(recall_item)
+            vid_keys.append(f"{rank_key_prefix}{vid}")
+        except:
+            continue
+    video_scores = redis_helper.get_batch_key(vid_keys)
+    if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
+        for i in range(len(video_scores)):
+            try:
+                if video_scores[i] is None:
+                    rec_recall_item_list[i]['sort_score'] = 0.0
+                else:
+                    rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
+            except Exception:
+                rec_recall_item_list[i]['sort_score'] = 0.0
+        rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    rov_recall_rank, flow_recall_rank = remove_duplicate(
+        rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
+    )
+    rank_result = []
+
+    flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
+                                                        'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+        # 按概率 p 及score排序获取 size - k 个视频
+    flow_num = 0
+    i = 0
+    print("zb-rank_result:" + str(rank_result[:size]))
+    print("zb-2:" + str([i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
+    print("zb-3:" + str([i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
 
 
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        flow_pool_recall_process['flow_pool_P'] = flow_pool_P
+        flow_pool_recall_process[f'{i}_rand'] = rand
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                # todo zhangbo rank
+                result = merge_density_control(
+                    rank_result[:size],
+                    [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    density_rules
+                )
+                return result, flow_num, flow_pool_recall_process
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                # todo zhangbo rank
+                result = merge_density_control(
+                    rank_result[:size],
+                    [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    density_rules
+                )
+                return result, flow_num, flow_pool_recall_process
+        i += 1
+    # todo zhangbo rank
+    result = merge_density_control(
+        rank_result[:size],
+        [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+        [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+        density_rules
+    )
+    return result, flow_num, flow_pool_recall_process
 # 排序服务兜底
 # 排序服务兜底
 def sup_rank(video_scores, recall_list):
 def sup_rank(video_scores, recall_list):
     if video_scores and len(recall_list) > 0:
     if video_scores and len(recall_list) > 0:
@@ -1306,19 +1415,113 @@ def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=N
             i += 1
             i += 1
     return rank_result[:size], flow_num
     return rank_result[:size], flow_num
 
 
-
+def merge_density_control(
+        data: list, rov: list, flow: list, rule: dict
+) -> list:
+    # 1 判断是否满足规则
+    status_cur: Dict[str, int] = {}
+    for d in data:
+        if "tags" in d.keys() and len(d["tags"]) != 0:
+            for t in d["tags"]:
+                if t in rule.keys():
+                    status_cur[t] = 1 + status_cur[t] if t in status_cur.keys() else 1
+    status_cur_illegal: Dict[str, int] = {}
+    for k, v in status_cur.items():
+        if k in rule.keys() and rule[k] < v:
+            status_cur_illegal[k] = v - rule[k]
+    if len(status_cur_illegal) == 0:
+        return data
+    # 2 反向遍历,直到status_cur_illegal满足,记录要替换的index和召回池标记。
+    indexes = []
+    pushes = []
+    var1 = len(data)
+    for i in range(var1-1, -1, -1):
+        d = data[i]
+        tags = d["tags"] if "tags" in d.keys() else []
+        inters = set(tags) & set(status_cur_illegal.keys())
+        if len(inters) == 0:
+            continue
+        indexes.append(i)
+        pushes.append(d["flowPool"] if "flowPool" in d.keys() and len(d["flowPool"]) != 0 else "")
+        for inter in inters:
+            status_cur_illegal[inter] = status_cur_illegal[inter] - 1
+            if status_cur_illegal[inter] == 0:
+                status_cur_illegal.pop(inter)
+            status_cur[inter] = status_cur[inter] - 1
+            if status_cur[inter] == 0:
+                status_cur.pop(inter)
+    # 3 反向遍历index,再正向遍历增补列表,取可替换的video
+    for index, push in zip(reversed(indexes), reversed(pushes)):
+        if len(push) > 0:
+            # 5 如果是flow的video 取不到 不做替换
+            candidate = flow
+        else:
+            # 5 如果是rov的video  取不到 不做替换
+            candidate = rov
+        for i, d in enumerate(candidate):
+            judge_rule_set = judge_rule(rule=rule, status=status_cur)
+            tags = set(d["tags"] if "tags" in d.keys() else [])
+            if len(judge_rule_set & tags) != 0:
+                continue
+            # 开始插入
+            data[index] = d
+            candidate.pop(i)
+            # 更新状态
+            for tag in tags:
+                status_cur[tag] = status_cur[tag] + 1 if tag in status_cur.keys() else 1
+            break
+        if len(push) > 0:
+            flow = candidate
+        else:
+            rov = candidate
+    return data
+def judge_rule(rule: dict, status: dict) -> Set:
+    result = set()
+    for k, v in status.items():
+        if k in rule.keys() and rule[k] <= v:
+            result.add(k)
+    return result
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
-    res = video_rank_by_w_h_rate(videos=d_test)
-    for tmp in res:
-        print(tmp)
+    data: list = [
+        {'videoId': 1, 'pushFrom': 'rov', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 2, 'pushFrom': 'rov', 'tags': ['下午好','祝福']},
+        {'videoId': 3, 'pushFrom': 'rov', 'tags': ['早上好']},
+        {'videoId': 4, 'pushFrom': 'flow', 'tags': ['下午好','元旦','祝福']},
+    ]
+    rov = [
+        {'videoId': 10, 'pushFrom': 'rov', 'tags': ['下午好']},
+        {'videoId': 11, 'pushFrom': 'rov', 'tags': ['祝福']},
+        {'videoId': 12, 'pushFrom': 'rov', 'tags': ['下午好','元旦','祝福']},
+        # {'videoId': 13, 'pushFrom': 'rov', 'tags': ['元旦']},
+        # {'videoId': 14, 'pushFrom': 'rov', 'tags': []},
+        # {'videoId': 15, 'pushFrom': 'rov', 'tags': []}
+    ]
+    flow = [
+        {'videoId': 20, 'pushFrom': 'flow', 'tags': ['下午好']},
+        {'videoId': 21, 'pushFrom': 'flow', 'tags': ['下午好']},
+        {'videoId': 22, 'pushFrom': 'flow', 'tags': []},
+        {'videoId': 23, 'pushFrom': 'flow', 'tags': []}
+    ]
+    rule = {
+        '下午好': 2,
+        '早上好': 1,
+        '祝福': 1
+    }
+    result = merge_density_control(data, rov, flow, rule)
+    print(result)
+
+    # d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
+    # res = video_rank_by_w_h_rate(videos=d_test)
+    # for tmp in res:
+    #     print(tmp)
+    pass

+ 8 - 22
video_recall.py

@@ -2052,26 +2052,8 @@ class PoolRecall(object):
         if region_code == '':
         if region_code == '':
             region_code = '-1'
             region_code = '-1'
 
 
-        # if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
-        #     if region_code == '-1':
-        #         t = [
-        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-        #         ]
-        #     else:
-        #         t = [
-        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-        #         ]
-        # else:
-
         if region_code == '-1':
         if region_code == '-1':
             t = [
             t = [
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
@@ -2087,7 +2069,6 @@ class PoolRecall(object):
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                 #
             ]
             ]
 
 
         gevent.joinall(t)
         gevent.joinall(t)
@@ -2106,6 +2087,11 @@ class PoolRecall(object):
         #             recall_num = size
         #             recall_num = size
         # if recall_num<size:
         # if recall_num<size:
         #     recall_num = size
         #     recall_num = size
+
+        # todo zhangbo rank 放开地域的截断,给后续排序更多供给 做规则调控
+        if self.ab_code == 60098:
+            recall_num = size * 3
+
         for region_result in region_recall_result_list:
         for region_result in region_recall_result_list:
             for video in region_result:
             for video in region_result:
                 video_id = video.get('videoId')
                 video_id = video.get('videoId')
@@ -4004,8 +3990,8 @@ class PoolRecall(object):
             else:
             else:
                 data2_list = []
                 data2_list = []
             data_for_filter = [i for i in data_for_filter if len(i) > 0]
             data_for_filter = [i for i in data_for_filter if len(i) > 0]
-            # data_for_filter.append([17736990, 17734880, 17734759, 17726977])
-            # data1_list.extend([17736990, 17734880, 17734759, 17726977])
+            data_for_filter.append([17736990, 17734880, 17734759, 17726977])
+            data1_list.extend([17736990, 17734880, 17734759, 17726977])
             # 3.9 获取item的tag特征
             # 3.9 获取item的tag特征
             video_tag_dict = self.get_video_tags(list(set(data1_list) | set(data2_list)))
             video_tag_dict = self.get_video_tags(list(set(data1_list) | set(data2_list)))
             # 4 视频过滤
             # 4 视频过滤
@@ -4031,7 +4017,7 @@ class PoolRecall(object):
                         'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['recall_strategy_trend_v1'],
                         'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['recall_strategy_trend_v1'],
                         'abCode': self.ab_code
                         'abCode': self.ab_code
                     })
                     })
-            print("results:" + str(results))
+            # print("results-trend:" + str(results))
             return results
             return results
         except Exception as e:
         except Exception as e:
             log_.error("error in recall_strategy_trend_v1:{}".format(e))
             log_.error("error in recall_strategy_trend_v1:{}".format(e))