|
@@ -2,6 +2,8 @@ import copy
|
|
import json
|
|
import json
|
|
import random
|
|
import random
|
|
import numpy
|
|
import numpy
|
|
|
|
+from typing import Dict
|
|
|
|
+from typing import Set
|
|
|
|
|
|
from log import Log
|
|
from log import Log
|
|
from config import set_config
|
|
from config import set_config
|
|
@@ -9,6 +11,7 @@ from video_recall import PoolRecall
|
|
from db_helper import RedisHelper
|
|
from db_helper import RedisHelper
|
|
from utils import FilterVideos, send_msg_to_feishu
|
|
from utils import FilterVideos, send_msg_to_feishu
|
|
from rank_service import get_featurs, get_tf_serving_sores
|
|
from rank_service import get_featurs, get_tf_serving_sores
|
|
|
|
+from parameter_update import param_update_rule
|
|
|
|
|
|
log_ = Log()
|
|
log_ = Log()
|
|
config_ = set_config()
|
|
config_ = set_config()
|
|
@@ -809,14 +812,11 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
|
|
"""
|
|
"""
|
|
redis_helper = RedisHelper()
|
|
redis_helper = RedisHelper()
|
|
|
|
|
|
- # add_flow_pool_recall_log
|
|
|
|
if flow_pool_recall_process is None:
|
|
if flow_pool_recall_process is None:
|
|
flow_pool_recall_process = {}
|
|
flow_pool_recall_process = {}
|
|
|
|
|
|
if not data['rov_pool_recall'] and not data['flow_pool_recall']:
|
|
if not data['rov_pool_recall'] and not data['flow_pool_recall']:
|
|
- # add_flow_pool_recall_log
|
|
|
|
return [], 0, flow_pool_recall_process
|
|
return [], 0, flow_pool_recall_process
|
|
- # return [], 0
|
|
|
|
|
|
|
|
rov_recall_rank = data['rov_pool_recall']
|
|
rov_recall_rank = data['rov_pool_recall']
|
|
vid_keys = []
|
|
vid_keys = []
|
|
@@ -847,7 +847,6 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
|
|
)
|
|
)
|
|
rank_result = []
|
|
rank_result = []
|
|
|
|
|
|
- # add_flow_pool_recall_log
|
|
|
|
flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
|
|
flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
|
|
'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
|
|
'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
|
|
|
|
|
|
@@ -864,12 +863,8 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
|
|
while i < size - top_K:
|
|
while i < size - top_K:
|
|
# 随机生成[0, 1)浮点数
|
|
# 随机生成[0, 1)浮点数
|
|
rand = random.random()
|
|
rand = random.random()
|
|
-
|
|
|
|
- # add_flow_pool_recall_log
|
|
|
|
flow_pool_recall_process['flow_pool_P'] = flow_pool_P
|
|
flow_pool_recall_process['flow_pool_P'] = flow_pool_P
|
|
flow_pool_recall_process[f'{i}_rand'] = rand
|
|
flow_pool_recall_process[f'{i}_rand'] = rand
|
|
-
|
|
|
|
- # log_.info('rand: {}'.format(rand))
|
|
|
|
if rand < flow_pool_P:
|
|
if rand < flow_pool_P:
|
|
if flow_recall_rank:
|
|
if flow_recall_rank:
|
|
rank_result.append(flow_recall_rank[0])
|
|
rank_result.append(flow_recall_rank[0])
|
|
@@ -887,7 +882,137 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
|
|
i += 1
|
|
i += 1
|
|
return rank_result[:size], flow_num, flow_pool_recall_process
|
|
return rank_result[:size], flow_num, flow_pool_recall_process
|
|
|
|
|
|
|
|
+def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:',
|
|
|
|
+ flow_pool_recall_process=None,
|
|
|
|
+ app_type=None):
|
|
|
|
+ """
|
|
|
|
+ 视频分发排序
|
|
|
|
+ :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
|
|
|
|
+ :param size: 请求数
|
|
|
|
+ :param top_K: 保证topK为召回池视频 type-int
|
|
|
|
+ :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
|
|
|
|
+ :param rank_key_prefix:
|
|
|
|
+ :return: rank_result
|
|
|
|
+ """
|
|
|
|
+ # 1 读取多样性密度控制规则 todo 没有判断非字典数据,有报错风险,先用try兜底。
|
|
|
|
+ redis_helper = RedisHelper()
|
|
|
|
+ density_rules = {}
|
|
|
|
+ rules_all = param_update_rule(redis_helper)
|
|
|
|
+ try:
|
|
|
|
+ if len(rules_all) != 0:
|
|
|
|
+ for k, v in rules_all.items():
|
|
|
|
+ if str(app_type) in v.keys():
|
|
|
|
+ app_type_rule = v[str(app_type)]
|
|
|
|
+ if "density" in app_type_rule.keys():
|
|
|
|
+ density_rules[k] = app_type_rule["density"]
|
|
|
|
+ except Exception as e:
|
|
|
|
+ log_.error("something is wrong in parsing density_rules:{}".format(e))
|
|
|
|
+ density_rules = {}
|
|
|
|
+
|
|
|
|
+ if flow_pool_recall_process is None:
|
|
|
|
+ flow_pool_recall_process = {}
|
|
|
|
+
|
|
|
|
+ if not data['rov_pool_recall'] and not data['flow_pool_recall']:
|
|
|
|
+ return [], 0, flow_pool_recall_process
|
|
|
|
+
|
|
|
|
+ rov_recall_rank = data['rov_pool_recall']
|
|
|
|
+ vid_keys = []
|
|
|
|
+ rec_recall_item_list = []
|
|
|
|
+ rec_recall_vid_list = []
|
|
|
|
+ for recall_item in data['rov_pool_recall']:
|
|
|
|
+ try:
|
|
|
|
+ vid = int(recall_item.get("videoId", 0))
|
|
|
|
+ rec_recall_vid_list.append(vid)
|
|
|
|
+ rec_recall_item_list.append(recall_item)
|
|
|
|
+ vid_keys.append(f"{rank_key_prefix}{vid}")
|
|
|
|
+ except:
|
|
|
|
+ continue
|
|
|
|
+ video_scores = redis_helper.get_batch_key(vid_keys)
|
|
|
|
+ if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
|
|
|
|
+ for i in range(len(video_scores)):
|
|
|
|
+ try:
|
|
|
|
+ if video_scores[i] is None:
|
|
|
|
+ rec_recall_item_list[i]['sort_score'] = 0.0
|
|
|
|
+ else:
|
|
|
|
+ rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
|
|
|
|
+ except Exception:
|
|
|
|
+ rec_recall_item_list[i]['sort_score'] = 0.0
|
|
|
|
+ rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
|
|
|
|
+ flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
|
|
|
|
+ rov_recall_rank, flow_recall_rank = remove_duplicate(
|
|
|
|
+ rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
|
|
|
|
+ )
|
|
|
|
+ # 2 多样性需求,给video添加tag todo
|
|
|
|
+ video_ids = []
|
|
|
|
+ video_ids.extend([v["videoId"] for v in rov_recall_rank])
|
|
|
|
+ video_ids.extend([v["videoId"] for v in flow_recall_rank])
|
|
|
|
+ video_ids = list(set(video_ids))
|
|
|
|
+ video_tag_dict = get_video_tags(redis_helper, video_ids)
|
|
|
|
+ for v in rov_recall_rank:
|
|
|
|
+ v["tags"] = video_tag_dict.get(v["videoId"], [])
|
|
|
|
+ for v in flow_recall_rank:
|
|
|
|
+ v["tags"] = video_tag_dict.get(v["videoId"], [])
|
|
|
|
+
|
|
|
|
+ rank_result = []
|
|
|
|
+ flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
|
|
|
|
+ 'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
|
|
|
|
+ # 从ROV召回池中获取top k
|
|
|
|
+ if len(rov_recall_rank) > 0:
|
|
|
|
+ rank_result.extend(rov_recall_rank[:top_K])
|
|
|
|
+ rov_recall_rank = rov_recall_rank[top_K:]
|
|
|
|
+ else:
|
|
|
|
+ rank_result.extend(flow_recall_rank[:top_K])
|
|
|
|
+ flow_recall_rank = flow_recall_rank[top_K:]
|
|
|
|
+ # 按概率 p 及score排序获取 size - k 个视频
|
|
|
|
+ flow_num = 0
|
|
|
|
+ i = 0
|
|
|
|
+ # print("zb-flow_pool_recall_process:" + str(flow_pool_recall_process))
|
|
|
|
+ # print("zb-density_rules:" + str(density_rules))
|
|
|
|
+ # print("zb-2:" + str([i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
|
|
|
|
+ # print("zb-3:" + str([i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
|
|
|
|
|
|
|
|
+ while i < size - top_K:
|
|
|
|
+ # 随机生成[0, 1)浮点数
|
|
|
|
+ rand = random.random()
|
|
|
|
+ flow_pool_recall_process['flow_pool_P'] = flow_pool_P
|
|
|
|
+ flow_pool_recall_process[f'{i}_rand'] = rand
|
|
|
|
+ if rand < flow_pool_P:
|
|
|
|
+ if flow_recall_rank:
|
|
|
|
+ rank_result.append(flow_recall_rank[0])
|
|
|
|
+ flow_recall_rank.remove(flow_recall_rank[0])
|
|
|
|
+ else:
|
|
|
|
+ rank_result.extend(rov_recall_rank[:size - top_K - i])
|
|
|
|
+ # todo zhangbo rank
|
|
|
|
+ result = merge_density_control(
|
|
|
|
+ rank_result[:size],
|
|
|
|
+ [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ density_rules
|
|
|
|
+ )
|
|
|
|
+ return result, flow_num, flow_pool_recall_process
|
|
|
|
+ else:
|
|
|
|
+ if rov_recall_rank:
|
|
|
|
+ rank_result.append(rov_recall_rank[0])
|
|
|
|
+ rov_recall_rank.remove(rov_recall_rank[0])
|
|
|
|
+ else:
|
|
|
|
+ rank_result.extend(flow_recall_rank[:size - top_K - i])
|
|
|
|
+ # todo zhangbo rank
|
|
|
|
+ result = merge_density_control(
|
|
|
|
+ rank_result[:size],
|
|
|
|
+ [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ density_rules
|
|
|
|
+ )
|
|
|
|
+ return result, flow_num, flow_pool_recall_process
|
|
|
|
+ i += 1
|
|
|
|
+ # todo zhangbo rank
|
|
|
|
+ result = merge_density_control(
|
|
|
|
+ rank_result[:size],
|
|
|
|
+ [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]],
|
|
|
|
+ density_rules
|
|
|
|
+ )
|
|
|
|
+ return result, flow_num, flow_pool_recall_process
|
|
# 排序服务兜底
|
|
# 排序服务兜底
|
|
def sup_rank(video_scores, recall_list):
|
|
def sup_rank(video_scores, recall_list):
|
|
if video_scores and len(recall_list) > 0:
|
|
if video_scores and len(recall_list) > 0:
|
|
@@ -1306,19 +1431,126 @@ def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=N
|
|
i += 1
|
|
i += 1
|
|
return rank_result[:size], flow_num
|
|
return rank_result[:size], flow_num
|
|
|
|
|
|
-
|
|
|
|
|
|
+def merge_density_control(
|
|
|
|
+ data: list, rov: list, flow: list, rule: dict
|
|
|
|
+) -> list:
|
|
|
|
+ # 1 判断是否满足规则
|
|
|
|
+ status_cur: Dict[str, int] = {}
|
|
|
|
+ for d in data:
|
|
|
|
+ if "tags" in d.keys() and len(d["tags"]) != 0:
|
|
|
|
+ for t in d["tags"]:
|
|
|
|
+ if t in rule.keys():
|
|
|
|
+ status_cur[t] = 1 + status_cur[t] if t in status_cur.keys() else 1
|
|
|
|
+ status_cur_illegal: Dict[str, int] = {}
|
|
|
|
+ for k, v in status_cur.items():
|
|
|
|
+ if k in rule.keys() and rule[k] < v:
|
|
|
|
+ status_cur_illegal[k] = v - rule[k]
|
|
|
|
+ if len(status_cur_illegal) == 0:
|
|
|
|
+ return data
|
|
|
|
+ # 2 反向遍历,直到status_cur_illegal满足,记录要替换的index和召回池标记。
|
|
|
|
+ indexes = []
|
|
|
|
+ pushes = []
|
|
|
|
+ var1 = len(data)
|
|
|
|
+ for i in range(var1-1, -1, -1):
|
|
|
|
+ d = data[i]
|
|
|
|
+ tags = d["tags"] if "tags" in d.keys() else []
|
|
|
|
+ inters = set(tags) & set(status_cur_illegal.keys())
|
|
|
|
+ if len(inters) == 0:
|
|
|
|
+ continue
|
|
|
|
+ indexes.append(i)
|
|
|
|
+ pushes.append(d["flowPool"] if "flowPool" in d.keys() and len(d["flowPool"]) != 0 else "")
|
|
|
|
+ for inter in inters:
|
|
|
|
+ status_cur_illegal[inter] = status_cur_illegal[inter] - 1
|
|
|
|
+ if status_cur_illegal[inter] == 0:
|
|
|
|
+ status_cur_illegal.pop(inter)
|
|
|
|
+ status_cur[inter] = status_cur[inter] - 1
|
|
|
|
+ if status_cur[inter] == 0:
|
|
|
|
+ status_cur.pop(inter)
|
|
|
|
+ # 3 反向遍历index,再正向遍历增补列表,取可替换的video
|
|
|
|
+ for index, push in zip(reversed(indexes), reversed(pushes)):
|
|
|
|
+ if len(push) > 0:
|
|
|
|
+ # 5 如果是flow的video 取不到 不做替换
|
|
|
|
+ candidate = flow
|
|
|
|
+ else:
|
|
|
|
+ # 5 如果是rov的video 取不到 不做替换
|
|
|
|
+ candidate = rov
|
|
|
|
+ for i, d in enumerate(candidate):
|
|
|
|
+ judge_rule_set = judge_rule(rule=rule, status=status_cur)
|
|
|
|
+ tags = set(d["tags"] if "tags" in d.keys() else [])
|
|
|
|
+ if len(judge_rule_set & tags) != 0:
|
|
|
|
+ continue
|
|
|
|
+ # 开始插入
|
|
|
|
+ tmp = copy.deepcopy(data[index])
|
|
|
|
+ data[index] = d
|
|
|
|
+ candidate[i] = tmp
|
|
|
|
+ # 更新状态
|
|
|
|
+ for tag in tags:
|
|
|
|
+ status_cur[tag] = status_cur[tag] + 1 if tag in status_cur.keys() else 1
|
|
|
|
+ break
|
|
|
|
+ if len(push) > 0:
|
|
|
|
+ flow = candidate
|
|
|
|
+ else:
|
|
|
|
+ rov = candidate
|
|
|
|
+ return data
|
|
|
|
+def judge_rule(rule: dict, status: dict) -> Set:
|
|
|
|
+ result = set()
|
|
|
|
+ for k, v in status.items():
|
|
|
|
+ if k in rule.keys() and rule[k] <= v:
|
|
|
|
+ result.add(k)
|
|
|
|
+ return result
|
|
|
|
+def get_video_tags(redis_helper, video_ids) -> dict:
|
|
|
|
+ REDIS_PREFIX = "alg_recsys_video_tags_"
|
|
|
|
+ redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
|
|
|
|
+ video_tags = redis_helper.get_batch_key(redis_keys)
|
|
|
|
+ video_tag_dict = {}
|
|
|
|
+ if video_tags is not None:
|
|
|
|
+ for i, tags_str in enumerate(video_tags):
|
|
|
|
+ tags = []
|
|
|
|
+ if tags_str is not None and len(tags_str) != 0:
|
|
|
|
+ tags = str(tags_str).split(",")
|
|
|
|
+ video_tag_dict[video_ids[i]] = tags
|
|
|
|
+ return video_tag_dict
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
- d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
- {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
|
|
|
|
- res = video_rank_by_w_h_rate(videos=d_test)
|
|
|
|
- for tmp in res:
|
|
|
|
- print(tmp)
|
|
|
|
|
|
+ data: list = [
|
|
|
|
+ {'videoId': 1, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
|
|
|
|
+ {'videoId': 2, 'flowPool': '', 'tags': ['下午好','祝福']},
|
|
|
|
+ {'videoId': 3, 'flowPool': '', 'tags': ['早上好']},
|
|
|
|
+ {'videoId': 4, 'flowPool': 'flow', 'tags': ['下午好','元旦','祝福']},
|
|
|
|
+ ]
|
|
|
|
+ rov = [
|
|
|
|
+ {'videoId': 10, 'flowPool': '', 'tags': ['下午好']},
|
|
|
|
+ {'videoId': 11, 'flowPool': '', 'tags': ['祝福']},
|
|
|
|
+ {'videoId': 12, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
|
|
|
|
+ {'videoId': 13, 'flowPool': '', 'tags': ['元旦', "下午好"]},
|
|
|
|
+ # {'videoId': 14, 'flowPool': '', 'tags': []},
|
|
|
|
+ # {'videoId': 15, 'flowPool': '', 'tags': []}
|
|
|
|
+ ]
|
|
|
|
+ flow = [
|
|
|
|
+ {'videoId': 20, 'flowPool': 'flow', 'tags': ['下午好']},
|
|
|
|
+ {'videoId': 21, 'flowPool': 'flow', 'tags': ['下午好']},
|
|
|
|
+ {'videoId': 22, 'flowPool': 'flow', 'tags': []},
|
|
|
|
+ {'videoId': 23, 'flowPool': 'flow', 'tags': []}
|
|
|
|
+ ]
|
|
|
|
+ rule = {
|
|
|
|
+ '下午好': 2,
|
|
|
|
+ '早上好': 1,
|
|
|
|
+ '祝福': 1
|
|
|
|
+ }
|
|
|
|
+ result = merge_density_control(data, rov, flow, rule)
|
|
|
|
+ print(result)
|
|
|
|
+
|
|
|
|
+ # d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
|
|
|
|
+ # {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
|
|
|
|
+ # res = video_rank_by_w_h_rate(videos=d_test)
|
|
|
|
+ # for tmp in res:
|
|
|
|
+ # print(tmp)
|
|
|
|
+ pass
|