瀏覽代碼

Merge branch 'master' into feature/zhangbo_5

zhangbo 1 年之前
父節點
當前提交
43f3f77cc4
共有 5 個文件被更改,包括 525 次插入102 次删除
  1. 57 2
      parameter_update.py
  2. 25 31
      recommend.py
  3. 179 23
      utils.py
  4. 254 22
      video_rank.py
  5. 10 24
      video_recall.py

+ 57 - 2
parameter_update.py

@@ -2,15 +2,21 @@ from my_utils import parse_json_for_risk_rule
 from my_utils import parse_json_for_risk_videos
 from db_helper import RedisHelper
 from config import set_config
-
-
+import json
+from log import Log
 config_ = set_config()
+log_ = Log()
 
 RISK_SHIELD_FILTER_RULE_V1_JSON = "RISK_SHIELD_FILTER_RULE_V1_JSON"
 RISK_SHIELD_FILTER_VIDEO_V1_STR = "RISK_SHIELD_FILTER_VIDEO_V1_STR"
 RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT = "RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT"
 RISK_SHIELD_FILTER_FLAG_BOOL = "RISK_SHIELD_FILTER_FLAG_BOOL"
 
+TAGS_FILTER_FLAG_BOOL = "TAGS_FILTER_FLAG_BOOL"
+TAGS_FILTER_RULE_V1_JSON = "TAGS_FILTER_RULE_V1_JSON"
+
+ALG_RECSYS_RANK_DENSITY_RULES_JSON = "ALG_RECSYS_RANK_DENSITY_RULES_JSON"
+
 def param_update_risk_rule() -> dict:
     """
     定时更新风险过滤的规则
@@ -73,6 +79,52 @@ def param_update_risk_filter_flag() -> bool:
             data = False
     return data
 
+def param_update_filter_flags() -> [bool, bool]:
+    """
+    用getbatch的方式从redis中取所有filter的flag
+    key1 = "RISK_SHIELD_FILTER_FLAG_BOOL"
+    key2 = "TAGS_FILTER_FLAG_BOOL"
+    value = "False"
+    """
+    redis_helper = RedisHelper()
+    key_list = [
+        RISK_SHIELD_FILTER_FLAG_BOOL,
+        TAGS_FILTER_FLAG_BOOL
+    ]
+    tmp = redis_helper.get_batch_key(name_list=key_list)
+    data1 = False
+    data2 = False
+    if tmp is not None:
+        try:
+            data1 = True if tmp[0].lower() == "true" else False
+        except Exception as e:
+            data1 = False
+    if tmp is not None:
+        try:
+            data2 = True if tmp[1].lower() == "true" else False
+        except Exception as e:
+            data2 = False
+    return [data1, data2]
+
+def param_update_rule(redis_helper: RedisHelper) -> dict:
+    tmp = redis_helper.get_data_from_redis(key_name=TAGS_FILTER_RULE_V1_JSON)
+    if tmp is not None:
+        try:
+            data = json.loads(tmp)
+            return data
+        except Exception as e:
+            log_.error("{}: parse json is wrong with in param_update_rule:{}".format(e, tmp))
+    return {}
+
+def param_update_density_rule(redis_helper: RedisHelper):
+    tmp = redis_helper.get_data_from_redis(key_name=ALG_RECSYS_RANK_DENSITY_RULES_JSON)
+    if tmp is not None:
+        try:
+            data = json.loads(tmp)
+            return data
+        except Exception as e:
+            log_.error("{}: parse json is wrong with in param_update_density_rule:{}".format(e, tmp))
+    return {}
 if __name__ == '__main__':
     pass
     d1 = param_update_risk_rule()
@@ -83,5 +135,8 @@ if __name__ == '__main__':
     d4 = param_update_risk_filter_flag()
     print(d3, type(d3))
     print(d4, type(d4))
+    redis_helper = RedisHelper()
+    d5 = param_update_rule(redis_helper)
+    print(d5, type(d5))
 
 

+ 25 - 31
recommend.py

@@ -13,7 +13,7 @@ from log import Log
 from config import set_config
 from video_recall import PoolRecall
 from video_rank import video_new_rank2, video_sank_pos_rank, video_new_rank, video_rank, refactor_video_rank, \
-    bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3
+    bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3, video_new_rank3_4density
 from db_helper import RedisHelper
 import gevent
 from utils import FilterVideos, get_user_has30day_return
@@ -111,7 +111,7 @@ def video_position_recommend(request_id, mid, uid, app_type, videos):
     pos1_vids = filted_list[0]
     pos2_vids = filted_list[1]
 
-    videos = positon_duplicate(pos1_vids, pos2_vids, videos)    
+    videos = positon_duplicate(pos1_vids, pos2_vids, videos)
 
     if pos1_vids is not None and len(pos1_vids) >0 :
         videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
@@ -537,10 +537,10 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     elif ab_code == 60068 or ab_code == 60070 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082 \
             or ab_code == 60083 or ab_code == 60084 or ab_code == 60085 or ab_code == 60086 \
             or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 or ab_code == 60096\
-            or ab_code == 60097:
+            or ab_code == 60097 or ab_code == 60098:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
         t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
-    elif ab_code == 60098:
+    elif ab_code == 60099:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
         t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
         # todo:zhangbo
@@ -629,7 +629,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             or ab_code == 60080 or ab_code == 60081 or ab_code == 60082 or ab_code == 60083 or ab_code == 60084\
             or ab_code == 60085 or ab_code == 60086 \
             or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 or ab_code == 60096\
-            or ab_code == 60097:
+            or ab_code == 60097 or ab_code == 60098:
         rov_pool_recall = []
         if len(recall_result_list)>=2:
             region_recall = recall_result_list[0]
@@ -667,7 +667,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             if len(rov_pool_recall)>0:
                 recall_result_list[0] = rov_pool_recall
     # merge新增的recall_strategy_trend_v1 60098
-    if ab_code == 60098:
+    if ab_code == 60099:
         rov_pool_recall = []
         if len(recall_result_list) >= 2:
             region_recall = recall_result_list[0]
@@ -718,10 +718,13 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     # add_flow_pool_recall_log
     recall_result_list = copy.deepcopy(recall_result_list)
     flow_pool_recall_process = {}
-    # print("zb:" + str(rov_pool_recall))
-    # ####### 排序
+
+    # ------------------排------------------
+    # ------------------序------------------
+    # ------------------逻------------------
+    # ------------------辑------------------
+
     start_rank = time.time()
-    # log_.info('====== rank')
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
         if ab_code in [
             config_.AB_CODE['rov_rank_appType_18_19'],
@@ -730,9 +733,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         ]:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[1][0]
-                # 'flow_pool_recall': recall_result_list[1]
             }
         else:
             data = {
@@ -740,7 +741,6 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 'flow_pool_recall': []
             }
     else:
-        # add_flow_pool_recall_log
         if recall_result_list[1][0]:
             redis_helper = RedisHelper()
             quick_flow_pool_P = redis_helper.get_data_from_redis(
@@ -750,20 +750,14 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 flow_pool_P = quick_flow_pool_P
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[1][0]
-                # 'flow_pool_recall': recall_result_list[1]
             }
-            # add_flow_pool_recall_log
             flow_pool_recall_process = recall_result_list[1][1].copy()
         else:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                # add_flow_pool_recall_log
                 'flow_pool_recall': recall_result_list[2][0]
-                # 'flow_pool_recall': recall_result_list[2]
             }
-            # add_flow_pool_recall_log
             flow_pool_recall_process = recall_result_list[2][1]
 
     # 3. 特征回流
@@ -797,16 +791,16 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     #4.
     # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
 
-    # rank_result, flow_num = video_new_rank3(
-    #     data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
-    # )
-
-    # add_flow_pool_recall_log
-
-    rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
-        data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
-        flow_pool_recall_process=flow_pool_recall_process
-    )
+    if ab_code == 60098:
+        rank_result, flow_num, flow_pool_recall_process = video_new_rank3_4density(
+            data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
+            flow_pool_recall_process=flow_pool_recall_process, app_type=app_type
+        )
+    else:
+        rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
+            data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
+            flow_pool_recall_process=flow_pool_recall_process
+        )
 
     #print(rank_result)
     if rank_result:
@@ -887,7 +881,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         region_code = province_code
     if region_code == '':
         region_code = '-1'
-    
+
     #print("region_code:", region_code)
 
     #size =1000
@@ -1013,7 +1007,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             #print(pushFrom, rank_id)
             flowPoolFlag = ''
             if rank_id in add_flow_set:
-                flowPoolFlag = flowFlag_dict.get(rank_id,'') 
+                flowPoolFlag = flowFlag_dict.get(rank_id,'')
             rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag,
                      'rovScore': rank_score, 'pushFrom': pushFrom,
                      'abCode': ab_code})
@@ -1657,7 +1651,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
             #         data_key = religion_param.get('data_key')
             #         rule_key_30day = religion_param.get('30day_rule_key')
 
-            
+
             for code, param in config_.AB_EXP_CODE.items():
                 if code in ab_exp_code_list:
                     ab_code = param.get('ab_code')

+ 179 - 23
utils.py

@@ -19,6 +19,115 @@ from parameter_update import param_update_risk_filter_flag
 config_ = set_config()
 log_ = Log()
 
+HOLIDAY_KV = {
+"2023-12-25":"圣诞节",
+"2024-01-01":"元旦",
+"2024-01-18":"腊八节",
+"2024-02-02":"小年",
+"2024-02-03":"小年",
+"2024-02-09":"除夕",
+"2024-02-10":"春节",
+"2024-02-14":"情人节",
+"2024-02-24":"元宵节",
+"2024-03-11":"龙抬头",
+"2024-03-08":"妇女节",
+"2024-05-01":"劳动节",
+"2024-05-12":"母亲节",
+"2024-06-01":"儿童节",
+"2024-06-10":"端午节",
+"2024-06-16":"父亲节",
+"2024-07-01":"建党节",
+"2024-07-07":"七七事变",
+"2024-08-01":"建军节",
+"2024-08-10":"七夕节",
+"2024-08-18":"中元节",
+"2024-09-17":"中秋节",
+"2024-09-09":"毛主席逝世",
+"2024-10-01":"国庆节",
+"2024-10-11":"重阳节",
+"2024-11-28":"感恩节",
+"2024-12-13":"公祭日",
+"2024-12-24":"平安夜",
+"2024-12-25":"圣诞节",
+"2024-12-26":"毛主席诞辰",
+"2024-01-06":"小寒",
+"2024-01-20":"大寒",
+"2024-02-04":"立春",
+"2024-02-19":"雨水",
+"2024-03-05":"惊蛰",
+"2024-03-20":"春分",
+"2024-04-04":"清明",
+"2024-04-19":"谷雨",
+"2024-05-05":"立夏",
+"2024-05-20":"小满",
+"2024-06-05":"芒种",
+"2024-06-21":"夏至",
+"2024-07-06":"小暑",
+"2024-07-22":"大暑",
+"2024-08-07":"立秋",
+"2024-08-22":"处暑",
+"2024-09-07":"白露",
+"2024-09-22":"秋分",
+"2024-10-08":"寒露",
+"2024-10-23":"霜降",
+"2024-11-07":"立冬",
+"2024-11-22":"小雪",
+"2024-12-06":"大雪",
+"2024-12-21":"冬至",
+"2025-01-01":"元旦",
+"2025-01-07":"腊八节",
+"2025-01-22":"小年",
+"2025-01-23":"小年",
+"2025-01-28":"除夕",
+"2025-01-29":"春节",
+"2025-02-14":"情人节",
+"2025-02-22":"元宵节",
+"2025-03-01":"龙抬头",
+"2025-03-08":"妇女节",
+"2025-05-01":"劳动节",
+"2025-05-11":"母亲节",
+"2025-06-01":"儿童节",
+"2025-05-31":"端午节",
+"2025-06-15":"父亲节",
+"2025-07-01":"建党节",
+"2054-07-07":"七七事变",
+"2025-08-01":"建军节",
+"2025-08-29":"七夕节",
+"2025-09-06":"中元节",
+"2025-10-06":"中秋节",
+"2025-09-09":"毛主席逝世",
+"2025-10-01":"国庆节",
+"2025-10-29":"重阳节",
+"2024-11-27":"感恩节",
+"2025-12-13":"公祭日",
+"2025-12-24":"平安夜",
+"2025-12-25":"圣诞节",
+"2025-12-26":"毛主席诞辰",
+"2025-01-05":"小寒",
+"2025-01-20":"大寒",
+"2025-02-03":"立春",
+"2025-02-18":"雨水",
+"2025-03-05":"惊蛰",
+"2025-03-20":"春分",
+"2025-04-04":"清明",
+"2025-04-20":"谷雨",
+"2025-05-05":"立夏",
+"2025-05-21":"小满",
+"2025-06-05":"芒种",
+"2025-06-21":"夏至",
+"2025-07-07":"小暑",
+"2025-07-22":"大暑",
+"2025-08-07":"立秋",
+"2025-08-23":"处暑",
+"2025-09-07":"白露",
+"2025-09-23":"秋分",
+"2025-10-08":"寒露",
+"2025-10-23":"霜降",
+"2025-11-07":"立冬",
+"2025-11-22":"小雪",
+"2025-12-07":"大雪",
+"2025-12-21":"冬至",
+}
 
 def send_msg_to_feishu(msg_text):
     """发送消息到飞书"""
@@ -892,7 +1001,7 @@ class FilterVideos(object):
         else:
             return video_ids[:min(self.force_truncation, len(video_ids))]
 
-    def filter_videos_for_group(self, region_code=None, videos=None):
+    def filter_videos_for_group(self, region_code=None, videos=None, video_tag_dict=None, tags_rule=None, tags_filter_flag=None):
         """视频过滤"""
         videos_filtered = self.filter_videos_with_risk_video(videos, self.app_type, region_code)
         filtered_pre_result = self.filter_video_previewed(videos_filtered)
@@ -901,30 +1010,77 @@ class FilterVideos(object):
         filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result)
         if not filtered_viewed_result:
             return None
-        filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
-        return filtered_viewed_videos
+
+        if tags_filter_flag:
+            result = self.filter_videos_with_tags_rule([int(video_id) for video_id in filtered_viewed_result],
+                                                       video_tag_dict, tags_rule)
+            result = [int(video_id) for video_id in result]
+        else:
+            result = [int(video_id) for video_id in filtered_viewed_result]
+        return result
+
+    def filter_videos_with_tags_rule(self, video_ids: list, video_tag_dict: dict, tags_rule: dict):
+        # 1 获取当日节日信息和小时数字
+        hour = datetime.now().hour
+        date = datetime.strftime(datetime.today(), '%Y-%m-%d')
+        holiday_cn = HOLIDAY_KV[date] if date in HOLIDAY_KV.keys() else ""
+        # 2 确认命中规则: 先处理天级别,后处理年级别
+        tag_days = ["早上好", "中午好", "下午好", "晚上好", "晚安"]
+        filter_tags = []
+        for tag_day in tag_days:
+            rules = tags_rule[tag_day] if tag_day in tags_rule.keys() else {}
+            start = rules["start"] if "start" in rules.keys() else 0
+            end = rules["end"] if "end" in rules.keys() else 23
+            if hour < start or hour > end:
+                filter_tags.append(tag_day)
+        if len(holiday_cn) != 0:
+            rules = tags_rule[holiday_cn] if holiday_cn in tags_rule.keys() else {}
+            start = rules["start"] if "start" in rules.keys() else 0
+            end = rules["end"] if "end" in rules.keys() else 9
+            if hour < start or hour > end:
+                filter_tags.append(holiday_cn)
+        if len(filter_tags) == 0:
+            return video_ids
+        # 3 获取视频的tag 进行过滤
+        video_id_result = []
+        for _, id in enumerate(video_ids):
+            tags = video_tag_dict[id]
+            if_filter_video = set(filter_tags) & set(tags)
+            if len(if_filter_video) > 0:
+                pass
+            else:
+                video_id_result.append(id)
+        return video_id_result
+
+
 
 if __name__ == '__main__':
-    user = [
-        ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
-        ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
-        ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
-        ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
-        ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
-        ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
-        ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
-        ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
-        ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
-        ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
-    ]
-    video_df = pd.read_csv('./data/videoids.csv')
-    videoid_list = video_df['videoid'].tolist()
-    for mid, uid in user:
-        video_ids = random.sample(videoid_list, 1000)
-        start_time = time.time()
-        filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
-        res = filter_.filter_videos_new()
-        print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
+    pass
+    # video_ids = [17736990, 17734880, 17734759, 17726977]
+    # video_tag_dict = {17734880: ['国庆节'], 17726977: ['圣诞节'], 17736990: ['早上好'], 17734759: ['晚上好']}
+    # tags_rule = {'早上好': {'start': 0, 'end': 9}, '中午好': {'start': 11, 'end': 13}, '冬至': {'start': 0, 'end': 9}, '祝福': {'start': 0, 'end': 23}}
+    # f = FilterVideos("request_id", "app_type", video_ids)
+    # f.filter_videos_with_tags_rule(video_ids, video_tag_dict, tags_rule)
+    # user = [
+    #     ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
+    #     ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
+    #     ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
+    #     ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
+    #     ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
+    #     ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
+    #     ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
+    #     ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
+    #     ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
+    #     ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
+    # ]
+    # video_df = pd.read_csv('./data/videoids.csv')
+    # videoid_list = video_df['videoid'].tolist()
+    # for mid, uid in user:
+    #     video_ids = random.sample(videoid_list, 1000)
+    #     start_time = time.time()
+    #     filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
+    #     res = filter_.filter_videos_new()
+    #     print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
     # filter_.filter_video_status(video_ids=[1, 3, 5])
 
     # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]

+ 254 - 22
video_rank.py

@@ -2,6 +2,8 @@ import copy
 import json
 import random
 import numpy
+from typing import Dict
+from typing import Set
 
 from log import Log
 from config import set_config
@@ -9,6 +11,7 @@ from video_recall import PoolRecall
 from db_helper import RedisHelper
 from utils import FilterVideos, send_msg_to_feishu
 from  rank_service import get_featurs, get_tf_serving_sores
+from parameter_update import param_update_rule
 
 log_ = Log()
 config_ = set_config()
@@ -809,14 +812,11 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     """
     redis_helper = RedisHelper()
 
-    # add_flow_pool_recall_log
     if flow_pool_recall_process is None:
         flow_pool_recall_process = {}
 
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-        # add_flow_pool_recall_log
         return [], 0, flow_pool_recall_process
-        # return [], 0
 
     rov_recall_rank = data['rov_pool_recall']
     vid_keys = []
@@ -847,7 +847,6 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     )
     rank_result = []
 
-    # add_flow_pool_recall_log
     flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
                                                         'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
 
@@ -864,12 +863,8 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     while i < size - top_K:
         # 随机生成[0, 1)浮点数
         rand = random.random()
-
-        # add_flow_pool_recall_log
         flow_pool_recall_process['flow_pool_P'] = flow_pool_P
         flow_pool_recall_process[f'{i}_rand'] = rand
-
-        # log_.info('rand: {}'.format(rand))
         if rand < flow_pool_P:
             if flow_recall_rank:
                 rank_result.append(flow_recall_rank[0])
@@ -887,7 +882,137 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
         i += 1
     return rank_result[:size], flow_num, flow_pool_recall_process
 
+def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:',
+                             flow_pool_recall_process=None,
+                             app_type=None):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :param rank_key_prefix:
+        :return: rank_result
+    """
+    # 1 读取多样性密度控制规则 todo 没有判断非字典数据,有报错风险,先用try兜底。
+    redis_helper = RedisHelper()
+    density_rules = {}
+    rules_all = param_update_rule(redis_helper)
+    try:
+        if len(rules_all) != 0:
+            for k, v in rules_all.items():
+                if str(app_type) in v.keys():
+                    app_type_rule = v[str(app_type)]
+                    if "density" in app_type_rule.keys():
+                        density_rules[k] = app_type_rule["density"]
+    except Exception as e:
+        log_.error("something is wrong in parsing density_rules:{}".format(e))
+        density_rules = {}
+
+    if flow_pool_recall_process is None:
+        flow_pool_recall_process = {}
+
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return [], 0, flow_pool_recall_process
+
+    rov_recall_rank = data['rov_pool_recall']
+    vid_keys = []
+    rec_recall_item_list = []
+    rec_recall_vid_list = []
+    for recall_item in data['rov_pool_recall']:
+        try:
+            vid = int(recall_item.get("videoId", 0))
+            rec_recall_vid_list.append(vid)
+            rec_recall_item_list.append(recall_item)
+            vid_keys.append(f"{rank_key_prefix}{vid}")
+        except:
+            continue
+    video_scores = redis_helper.get_batch_key(vid_keys)
+    if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
+        for i in range(len(video_scores)):
+            try:
+                if video_scores[i] is None:
+                    rec_recall_item_list[i]['sort_score'] = 0.0
+                else:
+                    rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
+            except Exception:
+                rec_recall_item_list[i]['sort_score'] = 0.0
+        rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    rov_recall_rank, flow_recall_rank = remove_duplicate(
+        rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
+    )
+    # 2 多样性需求,给video添加tag todo
+    video_ids = []
+    video_ids.extend([v["videoId"] for v in rov_recall_rank])
+    video_ids.extend([v["videoId"] for v in flow_recall_rank])
+    video_ids = list(set(video_ids))
+    video_tag_dict = get_video_tags(redis_helper, video_ids)
+    for v in rov_recall_rank:
+        v["tags"] = video_tag_dict.get(v["videoId"], [])
+    for v in flow_recall_rank:
+        v["tags"] = video_tag_dict.get(v["videoId"], [])
+
+    rank_result = []
+    flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
+                                                        'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+        # 按概率 p 及score排序获取 size - k 个视频
+    flow_num = 0
+    i = 0
+    # print("zb-flow_pool_recall_process:" + str(flow_pool_recall_process))
+    # print("zb-density_rules:" + str(density_rules))
+    # print("zb-2:" + str([i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
+    # print("zb-3:" + str([i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
 
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        flow_pool_recall_process['flow_pool_P'] = flow_pool_P
+        flow_pool_recall_process[f'{i}_rand'] = rand
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                # todo zhangbo rank
+                result = merge_density_control(
+                    rank_result[:size],
+                    [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    density_rules
+                )
+                return result, flow_num, flow_pool_recall_process
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                # todo zhangbo rank
+                result = merge_density_control(
+                    rank_result[:size],
+                    [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+                    density_rules
+                )
+                return result, flow_num, flow_pool_recall_process
+        i += 1
+    # todo zhangbo rank
+    result = merge_density_control(
+        rank_result[:size],
+        [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+        [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
+        density_rules
+    )
+    return result, flow_num, flow_pool_recall_process
 # 排序服务兜底
 def sup_rank(video_scores, recall_list):
     if video_scores and len(recall_list) > 0:
@@ -1306,19 +1431,126 @@ def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=N
             i += 1
     return rank_result[:size], flow_num
 
-
+def merge_density_control(
+        data: list, rov: list, flow: list, rule: dict
+) -> list:
+    # 1 判断是否满足规则
+    status_cur: Dict[str, int] = {}
+    for d in data:
+        if "tags" in d.keys() and len(d["tags"]) != 0:
+            for t in d["tags"]:
+                if t in rule.keys():
+                    status_cur[t] = 1 + status_cur[t] if t in status_cur.keys() else 1
+    status_cur_illegal: Dict[str, int] = {}
+    for k, v in status_cur.items():
+        if k in rule.keys() and rule[k] < v:
+            status_cur_illegal[k] = v - rule[k]
+    if len(status_cur_illegal) == 0:
+        return data
+    # 2 反向遍历,直到status_cur_illegal满足,记录要替换的index和召回池标记。
+    indexes = []
+    pushes = []
+    var1 = len(data)
+    for i in range(var1-1, -1, -1):
+        d = data[i]
+        tags = d["tags"] if "tags" in d.keys() else []
+        inters = set(tags) & set(status_cur_illegal.keys())
+        if len(inters) == 0:
+            continue
+        indexes.append(i)
+        pushes.append(d["flowPool"] if "flowPool" in d.keys() and len(d["flowPool"]) != 0 else "")
+        for inter in inters:
+            status_cur_illegal[inter] = status_cur_illegal[inter] - 1
+            if status_cur_illegal[inter] == 0:
+                status_cur_illegal.pop(inter)
+            status_cur[inter] = status_cur[inter] - 1
+            if status_cur[inter] == 0:
+                status_cur.pop(inter)
+    # 3 反向遍历index,再正向遍历增补列表,取可替换的video
+    for index, push in zip(reversed(indexes), reversed(pushes)):
+        if len(push) > 0:
+            # 5 如果是flow的video 取不到 不做替换
+            candidate = flow
+        else:
+            # 5 如果是rov的video  取不到 不做替换
+            candidate = rov
+        for i, d in enumerate(candidate):
+            judge_rule_set = judge_rule(rule=rule, status=status_cur)
+            tags = set(d["tags"] if "tags" in d.keys() else [])
+            if len(judge_rule_set & tags) != 0:
+                continue
+            # 开始插入
+            tmp = copy.deepcopy(data[index])
+            data[index] = d
+            candidate[i] = tmp
+            # 更新状态
+            for tag in tags:
+                status_cur[tag] = status_cur[tag] + 1 if tag in status_cur.keys() else 1
+            break
+        if len(push) > 0:
+            flow = candidate
+        else:
+            rov = candidate
+    return data
+def judge_rule(rule: dict, status: dict) -> Set:
+    result = set()
+    for k, v in status.items():
+        if k in rule.keys() and rule[k] <= v:
+            result.add(k)
+    return result
+def get_video_tags(redis_helper, video_ids) -> dict:
+    REDIS_PREFIX = "alg_recsys_video_tags_"
+    redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
+    video_tags = redis_helper.get_batch_key(redis_keys)
+    video_tag_dict = {}
+    if video_tags is not None:
+        for i, tags_str in enumerate(video_tags):
+            tags = []
+            if tags_str is not None and len(tags_str) != 0:
+                tags = str(tags_str).split(",")
+            video_tag_dict[video_ids[i]] = tags
+    return video_tag_dict
 
 if __name__ == '__main__':
-    d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
-              {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
-              {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
-    res = video_rank_by_w_h_rate(videos=d_test)
-    for tmp in res:
-        print(tmp)
+    data: list = [
+        {'videoId': 1, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 2, 'flowPool': '', 'tags': ['下午好','祝福']},
+        {'videoId': 3, 'flowPool': '', 'tags': ['早上好']},
+        {'videoId': 4, 'flowPool': 'flow', 'tags': ['下午好','元旦','祝福']},
+    ]
+    rov = [
+        {'videoId': 10, 'flowPool': '', 'tags': ['下午好']},
+        {'videoId': 11, 'flowPool': '', 'tags': ['祝福']},
+        {'videoId': 12, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
+        {'videoId': 13, 'flowPool': '', 'tags': ['元旦', "下午好"]},
+        # {'videoId': 14, 'flowPool': '', 'tags': []},
+        # {'videoId': 15, 'flowPool': '', 'tags': []}
+    ]
+    flow = [
+        {'videoId': 20, 'flowPool': 'flow', 'tags': ['下午好']},
+        {'videoId': 21, 'flowPool': 'flow', 'tags': ['下午好']},
+        {'videoId': 22, 'flowPool': 'flow', 'tags': []},
+        {'videoId': 23, 'flowPool': 'flow', 'tags': []}
+    ]
+    rule = {
+        '下午好': 2,
+        '早上好': 1,
+        '祝福': 1
+    }
+    result = merge_density_control(data, rov, flow, rule)
+    print(result)
+
+    # d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
+    #           {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
+    #           {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
+    # res = video_rank_by_w_h_rate(videos=d_test)
+    # for tmp in res:
+    #     print(tmp)
+    pass

+ 10 - 24
video_recall.py

@@ -2039,42 +2039,19 @@ class PoolRecall(object):
         if region_code == '':
             region_code = '-1'
 
-        # if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
-        #     if region_code == '-1':
-        #         t = [
-        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-        #         ]
-        #     else:
-        #         t = [
-        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-        #         ]
-        # else:
-
         if region_code == '-1':
             t = [
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
             ]
         else:
             t = [
-                #add recall video
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+               gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                 #
             ]
 
         gevent.joinall(t)
@@ -2093,6 +2070,11 @@ class PoolRecall(object):
         #             recall_num = size
         # if recall_num<size:
         #     recall_num = size
+
+        # todo zhangbo rank 放开地域的截断,给后续排序更多供给 做规则调控
+        if self.ab_code == 60098:
+            recall_num = size * 3
+
         for region_result in region_recall_result_list:
             for video in region_result:
                 video_id = video.get('videoId')
@@ -3959,7 +3941,11 @@ class PoolRecall(object):
                 if len(data2_list) > 0:
                     data_for_filter.extend(
                         [data2_list[i:i + group_size] for i in range(0, len(data2_list), group_size)])
+            else:
+                data2_list = []
             data_for_filter = [i for i in data_for_filter if len(i) > 0]
+            # data_for_filter.append([17736990, 17734880, 17734759, 17726977])
+            # data1_list.extend([17736990, 17734880, 17734759, 17726977])
             # 4 视频过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=None,