import time
import traceback
import random
from datetime import date, timedelta, datetime
from log import Log
from db_helper import RedisHelper
from config import set_config
from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
import gevent
import json
import sys
from parameter_update import param_update_expansion_factor
from parameter_update import param_update_risk_filter_flag
from parameter_update import param_update_risk_rule
from parameter_update import param_update_risk_videos
log_ = Log()
config_ = set_config()
class PoolRecall(object):
"""召回"""
def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None,
video_id=None, level_weight=None, h_data_key=None, h_rule_key=None):
"""
初始化
:param request_id: request_id
:param app_type: 产品标识 type-int
:param client_info: 用户位置信息 {"cityCode": "100000"}
:param mid: mid type-string
:param uid: uid type-string
:param ab_code: ab_code type-int
:param params:
"""
self.request_id = request_id
self.app_type = app_type
self.mid = mid
self.uid = uid
self.video_id = video_id
self.ab_code = ab_code
self.client_info = client_info
self.rule_key = rule_key
self.data_key = data_key
self.no_op_flag = no_op_flag
self.rule_key_30day = rule_key_30day
self.shield_config = shield_config
self.level_weight = level_weight
self.redis_helper = RedisHelper(params=params)
self.h_data_key = data_key
self.h_rule_key = h_rule_key
self.expansion_factor = param_update_expansion_factor()
self.risk_filter_flag = param_update_risk_filter_flag()
if self.risk_filter_flag:
self.app_region_filtered = param_update_risk_rule()
self.videos_with_risk = param_update_risk_videos()
else:
self.app_region_filtered = {}
self.videos_with_risk = []
def copy_redis_zset_data(self, from_key_name, to_key_name):
# 获取from_key_name中的数据
records = self.redis_helper.get_data_zset_with_index(key_name=from_key_name, start=0, end=-1, with_scores=True)
if records is not None:
data = {}
for video_id, score in records:
data[int(video_id)] = score
# 重新写入
if self.redis_helper.key_exists(to_key_name):
self.redis_helper.del_keys(key_name=to_key_name)
self.redis_helper.add_data_with_zset(key_name=to_key_name, data=data, expire_time=1*3600)
return True
else:
return False
def update_mid_data(self, h_recall_mid_key, h_record_key, key_prefix):
# 判断当前小时的小时级列表是否更新
now_date = datetime.today()
h = datetime.now().hour
now_dt = datetime.strftime(now_date, '%Y%m%d')
now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
if self.redis_helper.key_exists(key_name=now_h_recall_key):
flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
if flag:
value = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=1*3600)
else:
if h == 0:
redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
redis_h = 23
else:
redis_dt = now_dt
redis_h = h - 1
now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{redis_dt}.{redis_h}"
flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
if flag:
value = {'date': redis_dt, 'h': redis_h}
self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=1*3600)
def get_mid_h_key(self, province_code, key_flag=''):
if key_flag == 'region_24h':
# mid对应小时级视频列表 redis-key 地域分组相对24h
h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
# 判断mid对应小时级视频列表 时间记录
h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
elif key_flag in ['24h', 'day_24h']:
# mid对应小时级视频列表 redis-key 相对24h
h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
# 判断mid对应小时级视频列表 时间记录
h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
else:
# mid对应小时级视频列表 redis-key
h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.app_type}.{self.mid}"
# 判断mid对应小时级视频列表 时间记录
h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.app_type}.{self.mid}"
# 列表存储 redis-key prefix
if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
if key_flag == 'region_24h':
if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{province_code}."
else:
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{province_code}."
elif key_flag == 'day_24h':
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{province_code}."
else:
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
elif key_flag == '24h':
key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H
else:
key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
if not self.redis_helper.key_exists(key_name=h_record_key):
# ###### 记录key不存在,copy列表,更新记录
self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
# return h_recall_mid_key
else:
# ###### 记录key存在,判断date, h
now_date = datetime.today()
h = datetime.now().hour
# 获取记录的date, h
record = self.redis_helper.get_data_from_redis(key_name=h_record_key)
record_dt = eval(record).get('date')
record_h = eval(record).get('h')
now_dt = datetime.strftime(now_date, '%Y%m%d')
if record_dt == now_dt and int(record_h) == h:
# 已获取当前小时数据
pass
# return h_recall_mid_key
elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
# 记录的h - 当前h = 1,判断当前h数据是否已更新
now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
# if not self.redis_helper.key_exists(key_name=now_h_recall_key):
# 未更新
# return h_recall_mid_key
if self.redis_helper.key_exists(key_name=now_h_recall_key):
# 已更新,重新获取更新mid对应列表及记录
# self.redis_helper.del_keys(key_name=h_recall_mid_key)
# self.redis_helper.del_keys(key_name=h_record_key)
flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
if flag:
new_record = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(new_record), expire_time=2*3600)
# return h_recall_mid_key
else:
self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
# return h_recall_mid_key
return h_recall_mid_key
def rov_pool_recall_by_h(self, size=10, expire_time=24*3600):
"""
从小时级更新ROV召回池中获取视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
start_time = time.time()
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
if province_code == '':
province_code = '-1'
if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
push_from = config_.PUSH_FROM['rov_recall_24h']
else:
push_from = config_.PUSH_FROM['rov_recall_h']
# 获取mid对应的小时级列表redis-key
h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
if not self.redis_helper.key_exists(h_recall_mid_key):
recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
else:
# 过滤的视频
fil_video_ids = []
recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
start=(freq - 1) * get_size, end=freq * get_size - 1,
with_scores=True)
if not data:
# log_.info('小时级更新视频已取完')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
recall_result.extend(temp_result)
fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
else:
fil_video_ids.extend(video_ids)
# 将被过滤的视频进行移除
for value in fil_video_ids:
self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
# 判断获取到的小时级数据数量
if len(recall_result) < size:
# 补充数据
rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
# 去重合并
now_video_ids = [item.get('videoId') for item in recall_result]
for video in rov_recall_result:
vid = video.get('videoId')
if vid not in now_video_ids:
recall_result.append(video)
now_video_ids.append(vid)
if len(recall_result) >= size:
break
else:
continue
log_.info({
'logTimestamp': int(time.time() * 1000),
'request_id': self.request_id,
'operation': 'rov_pool_recall_by_h',
'executeTime': (time.time() - start_time) * 1000
})
return recall_result[:size]
def rov_pool_recall_by_day(self, size=4, expire_time=24*3600):
"""
从天级规则更新列表中获取视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
start_time = time.time()
# 获取天级规则更新列表相关redis key, 用户上一次在天级规则更新列表对应的位置
rule_key_name, last_rule_day_recall_key, idx = self.get_video_last_idx_day()
# 获取天级规则更新列表数据
if not rule_key_name:
# log_.info('天级规则更新列表中无视频')
recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
else:
recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=rule_key_name,
start=idx, end=idx + get_size - 1,
with_scores=True)
if not data:
# log_.info('天级规则更新视频已取完')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': config_.PUSH_FROM['rov_recall_day'], 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
recall_result.extend(temp_result)
else:
# 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
self.redis_helper.set_data_to_redis(key_name=last_rule_day_recall_key, value=data[-1][0],
expire_time=expire_time)
idx += get_size
# 判断获取到的天级规则数据数量
if len(recall_result) < size:
# 补充数据
rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
# 去重合并
now_video_ids = [item.get('videoId') for item in recall_result]
for video in rov_recall_result:
vid = video.get('videoId')
if vid not in now_video_ids:
recall_result.append(video)
now_video_ids.append(vid)
if len(recall_result) >= size:
break
else:
continue
log_.info({
'logTimestamp': int(time.time() * 1000),
'request_id': self.request_id,
'operation': 'rov_pool_recall_by_day',
'executeTime': (time.time() - start_time) * 1000
})
return recall_result[:size]
def rov_pool_recall(self, size=10, expire_time=24*3600, video_type='', push_from=config_.PUSH_FROM['rov_recall']):
"""
从ROV召回池中获取视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:param video_type: 视频列表类别
:param push_from: 视频来源标记
:return:
"""
start_time = time.time()
# 获取生效中的置顶视频
if self.no_op_flag:
top_video_ids, top_video_result = [], []
elif self.client_info is None:
# 无用户位置信息时,不获取置顶视频
top_video_ids, top_video_result = [], []
else:
top_video_ids, top_video_result = self.get_top_videos()
# log_.info('===top video result = {}'.format(top_video_ids))
# 获取修改过rov的视频
if self.no_op_flag:
update_rov_video_ids, update_rov_result = [], []
else:
update_rov_video_ids, update_rov_result = self.get_update_rov_videos()
# log_.info('update rov result = {}'.format(update_rov_video_ids))
# 与置顶视频去重
update_rov_video_ids_dup, update_rov_dup_result = [], []
for item in update_rov_result:
if item['videoId'] not in top_video_ids:
update_rov_video_ids_dup.append(item['videoId'])
update_rov_dup_result.append(item)
# 获取相关redis key, 用户上一次在rov召回池对应的位置
rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx(video_type=video_type)
if not rov_pool_key:
# log_.info('ROV召回池中无视频')
if (not update_rov_dup_result) and (not top_video_result):
return []
rov_pool_recall_result = top_video_result.extend(update_rov_dup_result)
rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
return rov_pool_recall_result[:size]
rov_pool_recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(rov_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
st_get = time.time()
data = self.redis_helper.get_data_zset_with_index(key_name=rov_pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
et_get = time.time()
# log_.info('get data from rov pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('ROV召回池中的视频已取完')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
# 视频id在 生效中的置顶视频 或 修改过rov的视频 中,跳过
if video_id in update_rov_video_ids_dup or video_id in top_video_ids:
continue
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
# filtered_result = filter_.filter_videos()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
rov_pool_recall_result.extend(temp_result)
else:
# 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
if self.mid:
# mid为空时,不做记录
self.redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0],
expire_time=expire_time)
idx += get_size
# 生效中的置顶视频、被修改rov视频、rov召回池视频 归并排序
if top_video_result:
rov_pool_recall_result.extend(top_video_result)
if update_rov_dup_result:
rov_pool_recall_result.extend(update_rov_dup_result)
rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'rov_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return rov_pool_recall_result[:size]
def flow_pool_recall(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
"""从流量池中获取视频"""
# add_flow_pool_recall_log
flow_pool_recall_process = {}
start_time = time.time()
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
# add_flow_pool_recall_log
flow_pool_recall_process['flow_pool_key'] = flow_pool_key
# print(flow_pool_key)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
break
# 获取数据
# st_get = time.time()
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
# add_flow_pool_recall_log
flow_pool_recall_process['initial_data'] = data
# et_get = time.time()
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value[0].split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
if video_id not in video_ids:
video_ids.append(video_id)
video_score[video_id] = value[1]
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
region_code=region_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['filtered_data'] = filtered_result
# 检查可分发数
if filtered_result:
st_check = time.time()
ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
ge.join()
check_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['check_counts_data'] = check_result
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'app_type': self.app_type,
# 'mid': self.mid,
# 'uid': self.uid,
# 'operation': 'check_video_counts',
# 'executeTime': (time.time() - st_check) * 1000
# })
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
if video_id not in flow_pool_recall_videos:
# 取其中一个 flow_pool 作为召回结果
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': flow_pool,
'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
)
flow_pool_recall_videos.append(video_id)
# et_check = time.time()
# log_.info('check result: result = {}, execute time = {}ms'.format(
# check_result, (et_check - st_check) * 1000))
# # 判断错误标记, True为错误
# if error_flag:
# # 结束流量池召回
# break
idx += get_size
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'flow_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return flow_pool_recall_result[:size], flow_pool_recall_process
def flow_pool_recall_new(self, size=10, flow_pool_id=None):
"""从流量池中获取视频"""
start_time = time.time()
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
flow_pool_key = self.get_pool_redis_key('flow_set', flow_pool_id=flow_pool_id)
# print(flow_pool_key)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
break
# 获取数据
# st_get = time.time()
data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
# et_get = time.time()
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value.split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
if video_id not in video_ids:
video_ids.append(video_id)
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
region_code=region_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
# 检查可分发数
if filtered_result:
st_check = time.time()
ge = gevent.spawn(self.check_video_counts_new, video_ids=filtered_result, flow_pool_mapping=video_mapping)
ge.join()
check_result = ge.get()
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'app_type': self.app_type,
# 'mid': self.mid,
# 'uid': self.uid,
# 'operation': 'check_video_counts',
# 'executeTime': (time.time() - st_check) * 1000
# })
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
if video_id not in flow_pool_recall_videos:
# 取其中一个 flow_pool 作为召回结果
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': flow_pool,
'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code}
)
flow_pool_recall_videos.append(video_id)
# et_check = time.time()
# log_.info('check result: result = {}, execute time = {}ms'.format(
# check_result, (et_check - st_check) * 1000))
# # 判断错误标记, True为错误
# if error_flag:
# # 结束流量池召回
# break
idx += get_size
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'flow_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return flow_pool_recall_result[:size]
def flow_pool_recall_new_with_level(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
"""从流量池中获取视频"""
# add_flow_pool_recall_log
flow_pool_recall_process = {}
start_time = time.time()
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
flow_pool_key, level = self.get_pool_redis_key('flow_set_level', flow_pool_id=flow_pool_id)
# add_flow_pool_recall_log
flow_pool_recall_process['flow_pool_key'] = flow_pool_key
flow_pool_recall_process['level'] = level
if flow_pool_key is None:
return [], flow_pool_recall_process
# print(flow_pool_key)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
break
# 获取数据
# st_get = time.time()
data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
# add_flow_pool_recall_log
flow_pool_recall_process['initial_data'] = data
# et_get = time.time()
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value.split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
if video_id not in video_ids:
video_ids.append(video_id)
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
region_code=region_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['filtered_data'] = filtered_result
# 检查可分发数
if filtered_result:
# st_check = time.time()
ge = gevent.spawn(self.check_video_counts_new_with_level, video_ids=filtered_result, flow_pool_mapping=video_mapping)
ge.join()
check_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['check_counts_data'] = check_result
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'app_type': self.app_type,
# 'mid': self.mid,
# 'uid': self.uid,
# 'operation': 'check_video_counts',
# 'executeTime': (time.time() - st_check) * 1000
# })
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
if video_id not in flow_pool_recall_videos:
# 取其中一个 flow_pool 作为召回结果
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': flow_pool, 'level': level,
'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
)
flow_pool_recall_videos.append(video_id)
# et_check = time.time()
# log_.info('check result: result = {}, execute time = {}ms'.format(
# check_result, (et_check - st_check) * 1000))
# # 判断错误标记, True为错误
# if error_flag:
# # 结束流量池召回
# break
idx += get_size
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'flow_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return flow_pool_recall_result[:size], flow_pool_recall_process
def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
"""从流量池中获取视频"""
# add_flow_pool_recall_log
flow_pool_recall_process = {}
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
# add_flow_pool_recall_log
flow_pool_recall_process['flow_pool_key'] = flow_pool_key
flow_pool_recall_process['level'] = level
if flow_pool_key is None:
return [], flow_pool_recall_process
# print(flow_pool_key)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
break
# 获取数据
# st_get = time.time()
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
# add_flow_pool_recall_log
# print(data)
flow_pool_recall_process['initial_data'] = data
# et_get = time.time()
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value[0].split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
video_score[value[0]] = value[1]
if video_id not in video_ids:
video_ids.append(video_id)
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
region_code=region_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['filtered_data'] = filtered_result
# 检查可分发数
if filtered_result:
# st_check = time.time()
ge = gevent.spawn(self.check_video_counts_new_with_level_score,
video_ids=filtered_result, flow_pool_mapping=video_mapping)
ge.join()
check_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['check_counts_data'] = check_result
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'app_type': self.app_type,
# 'mid': self.mid,
# 'uid': self.uid,
# 'operation': 'check_video_counts',
# 'executeTime': (time.time() - st_check) * 1000
# })
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
if video_id not in flow_pool_recall_videos:
# 取其中一个 flow_pool 作为召回结果
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': flow_pool, 'level': level,
'rovScore': video_score[f"{video_id}-{flow_pool}"], 'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
)
flow_pool_recall_videos.append(video_id)
# et_check = time.time()
# log_.info('check result: result = {}, execute time = {}ms'.format(
# check_result, (et_check - st_check) * 1000))
# # 判断错误标记, True为错误
# if error_flag:
# # 结束流量池召回
# break
idx += get_size
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'flow_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return flow_pool_recall_result[:size], flow_pool_recall_process
def flow_pool_recall_new_with_level_score2(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
"""从流量池中获取视频"""
# add_flow_pool_recall_log
flow_pool_recall_process = {}
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
# add_flow_pool_recall_log
flow_pool_recall_process['flow_pool_key'] = flow_pool_key
flow_pool_recall_process['level'] = level
if flow_pool_key is None:
return [], flow_pool_recall_process
# print(flow_pool_key)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 获取数据
idx = 0
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
start=idx, end=idx + get_size * 5 - 1,
with_scores=True)
flow_pool_recall_process['initial_data'] = data
if not data:
return [], flow_pool_recall_process
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value[0].split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
video_score[value[0]] = value[1]
if video_id not in video_ids:
video_ids.append(video_id)
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 检查可分发数
ge = gevent.spawn(self.check_video_counts_new_with_level_score,
video_ids=video_ids, flow_pool_mapping=video_mapping)
ge.join()
check_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['check_counts_data'] = check_result
check_result_mapping = {}
check_result_items = []
if check_result:
# 获取score top20 视频进入过滤
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
score = video_score[f"{video_id}-{flow_pool}"]
if video_id not in flow_pool_recall_videos:
check_result_mapping[video_id] = [flow_pool, score]
check_result_items.append([video_id, flow_pool, score])
check_result_items = sorted(check_result_items, key=lambda x: x[2], reverse=True)
to_filter_videos = [item[0] for item in check_result_items[:get_size]]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
region_code=region_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
# add_flow_pool_recall_log
flow_pool_recall_process['filtered_data'] = filtered_result
if filtered_result:
for item in filtered_result:
video_id = int(item)
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': check_result_mapping[video_id][0], 'level': level,
'rovScore': check_result_mapping[video_id][1],
'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
)
return flow_pool_recall_result[:size], flow_pool_recall_process
def check_video_counts(self, video_ids, flow_pool_mapping):
"""
检查视频剩余可分发数
:param video_ids: 视频id type-list
:param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
:return: check_result, error_flag
"""
# flow_pool_key = self.get_pool_redis_key('flow')
# videos = []
check_result = []
for video_id in video_ids:
video_id = int(video_id)
for flow_pool in flow_pool_mapping.get(video_id, []):
# 判断是否有本地分发记录
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
# 无记录
if cur_count is None:
# videos.append({'videoId': video_id, 'flowPool': flow_pool})
continue
# 本地分发数 cur_count > 0
elif cur_count > 0:
check_result.append((video_id, flow_pool))
# 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
else:
add_remove_log = False
remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
self.redis_helper.del_keys(remain_count_key)
value = '{}-{}'.format(video_id, flow_pool)
for item in config_.APP_TYPE:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
if remove_res > 0:
add_remove_log = True
quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
f":{config_.QUICK_FLOW_POOL_ID}"
remove_res = self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
if remove_res > 0:
add_remove_log = True
if add_remove_log is True:
log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
return check_result
def check_video_counts_new(self, video_ids, flow_pool_mapping):
"""
检查视频剩余可分发数
:param video_ids: 视频id type-list
:param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
:return: check_result, error_flag
"""
# flow_pool_key = self.get_pool_redis_key('flow')
# videos = []
check_result = []
for video_id in video_ids:
video_id = int(video_id)
for flow_pool in flow_pool_mapping.get(video_id, []):
# 判断是否有本地分发记录
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
# 无记录
if cur_count is None:
# videos.append({'videoId': video_id, 'flowPool': flow_pool})
continue
# 本地分发数 cur_count > 0
elif cur_count > 0:
check_result.append((video_id, flow_pool))
# 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
else:
add_remove_log = False
remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
self.redis_helper.del_keys(remain_count_key)
value = '{}-{}'.format(video_id, flow_pool)
for item in config_.APP_TYPE:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}"
remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
if remove_res > 0:
add_remove_log = True
quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
f":{config_.QUICK_FLOW_POOL_ID}"
remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
values=(value, ))
if remove_res > 0:
add_remove_log = True
if add_remove_log is True:
log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
return check_result
def check_video_counts_new_with_level(self, video_ids, flow_pool_mapping):
"""
检查视频剩余可分发数
:param video_ids: 视频id type-list
:param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
:return: check_result, error_flag
"""
# flow_pool_key = self.get_pool_redis_key('flow')
# videos = []
# level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
# level_list = [level for level in json.loads(level_weight)]
if self.level_weight is None:
level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
else:
level_weight = self.level_weight
level_list = [level for level in level_weight]
check_result = []
for video_id in video_ids:
video_id = int(video_id)
for flow_pool in flow_pool_mapping.get(video_id, []):
# 判断是否有本地分发记录
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
# 无记录
if cur_count is None:
# videos.append({'videoId': video_id, 'flowPool': flow_pool})
continue
# 本地分发数 cur_count > 0
elif cur_count > 0:
check_result.append((video_id, flow_pool))
# 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
else:
add_remove_log = False
remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
self.redis_helper.del_keys(remain_count_key)
value = '{}-{}'.format(video_id, flow_pool)
for item in config_.APP_TYPE:
for level in level_list:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{config_.APP_TYPE.get(item)}:{level}"
remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
if remove_res > 0:
add_remove_log = True
quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
f":{config_.QUICK_FLOW_POOL_ID}"
remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
values=(value, ))
if remove_res > 0:
add_remove_log = True
if add_remove_log is True:
log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
return check_result
def check_video_counts_new_with_level_score(self, video_ids, flow_pool_mapping):
"""
检查视频剩余可分发数
:param video_ids: 视频id type-list
:param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
:return: check_result, error_flag
"""
# flow_pool_key = self.get_pool_redis_key('flow')
# videos = []
# level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
# level_list = [level for level in json.loads(level_weight)]
if self.level_weight is None:
level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
else:
level_weight = self.level_weight
level_list = [level for level in level_weight]
check_result = []
for video_id in video_ids:
video_id = int(video_id)
for flow_pool in flow_pool_mapping.get(video_id, []):
# 判断是否有本地分发记录
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
# 无记录
if cur_count is None:
# videos.append({'videoId': video_id, 'flowPool': flow_pool})
continue
# 本地分发数 cur_count > 0
elif cur_count > 0:
check_result.append((video_id, flow_pool))
# 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
else:
add_remove_log = False
remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
self.redis_helper.del_keys(remain_count_key)
value = '{}-{}'.format(video_id, flow_pool)
for item in config_.APP_TYPE:
for level in level_list:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{config_.APP_TYPE.get(item)}:{level}"
remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
if remove_res > 0:
add_remove_log = True
quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
f":{config_.QUICK_FLOW_POOL_ID}"
remove_res = self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
if remove_res > 0:
add_remove_log = True
if add_remove_log is True:
log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
return check_result
"""
# 本次视频都有本地记录
if len(videos) == 0:
error_flag = False
return check_result, error_flag
# 本地无记录视频,检查实时分发数
st_remain_view_count = time.time()
view_count_result, error_flag = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
log_.info({
'logTimestamp': int(time.time() * 1000),
'request_id': self.request_id,
'app_type': self.app_type,
'mid': self.mid,
'uid': self.uid,
'operation': 'remainViewCount',
'executeTime': (time.time() - st_remain_view_count) * 1000
})
# 判断返回的错误标记,True为错误
if error_flag:
return check_result, error_flag
# 从流量召回池移除视频videos
# for item in videos:
# value = '{}-{}'.format(item['videoId'], item['flowPool'])
# self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
# redis_helper = RedisHelper()
for item in view_count_result:
try:
# 接口超时,item[2]可能为None
remain_count = int(item[2])
except Exception as e:
# log_.error('remain_count type error...')
log_.error(traceback.format_exc())
continue
if remain_count > 0:
# viewCount > 0
check_result.append(item)
# 将分发数更新到本地记录
key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{item[0]}:{item[1]}"
self.redis_helper.setnx_key(key_name=key_name, value=remain_count, expire_time=5 * 60)
else:
# viewCount <= 0
# 从流量召回池移除
value = '{}-{}'.format(item[0], item[1])
for item in config_.APP_TYPE:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
return check_result, error_flag
"""
def get_pool_redis_key(self, pool_type, flow_pool_id=None):
"""
拼接key
:param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
:param flow_pool_id: 流量池ID
:return: key_name
"""
if pool_type == 'rov':
# appType = 13 票圈视频app
if self.app_type == config_.APP_TYPE['APP']:
key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP
# abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
# elif self.ab_code == config_.AB_CODE['rov_rank_appType_18_19']:
# key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{self.app_type}."
# 其他
else:
key_name_prefix = config_.RECALL_KEY_NAME_PREFIX
# 判断热度列表是否更新,未更新则使用前一天的热度列表
key_name = key_name_prefix + time.strftime('%Y%m%d')
if self.redis_helper.key_exists(key_name):
redis_date = date.today().strftime('%Y%m%d')
else:
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
key_name = key_name_prefix + redis_date
# if not self.redis_helper.key_exists(key_name):
# return None, None
# 判断当前时间是否晚于rov召回池更新时间 + 1h,发送消息到飞书
# now_h = datetime.now().hour
# now_m = datetime.now().minute
# feishu_text = '{} —— 今日ROV召回池数据未按时更新,请及时查看解决。'.format(config_.ENV_TEXT)
# if now_h == config_.ROV_UPDATE_H + 1 and now_m > config_.ROV_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
# elif now_h > config_.ROV_UPDATE_H + 2:
# send_msg_to_feishu(feishu_text)
return key_name, redis_date
elif pool_type == 'flow':
if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
else:
return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
elif pool_type == 'flow_set':
if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
else:
return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}"
elif pool_type == 'flow_set_level':
if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
else:
# 1. 获取流量池各层级分发概率权重
# level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
# if level_weight is None:
# level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
# else:
# level_weight = json.loads(level_weight)
if self.level_weight is None:
level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
else:
level_weight = self.level_weight
# print(level_weight)
# 2. 判断各层级是否有视频需分发
available_level = []
for level, weight in level_weight.items():
level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{self.app_type}:{level}"
if self.redis_helper.key_exists(key_name=level_key):
available_level.append((level, level_key, weight))
if len(available_level) == 0:
return None, None
# 3. 根据可分发层级权重设置分发概率
available_level = sorted(available_level, key=lambda x: x[2], reverse=False)
weight_sum = sum([int(item[2]) for item in available_level])
level_p_mapping = {}
level_p_low = 0
weight_temp = 0
for item in available_level:
level, level_key, weight = item[0], item[1], item[2]
level_p_up = (weight_temp + weight)/weight_sum
level_p_mapping[level] = {
'key': level_key,
'level_p': [round(level_p_low, 2), round(level_p_up, 2)]
}
level_p_low = round(level_p_up, 2)
weight_temp += weight
# log_.info(f"level_p_mapping: {level_p_mapping}")
# 4. 随机生成[0,1)之间数,返回相应概率区间的key
random_p = random.random()
for level, level_info in level_p_mapping.items():
level_p = level_info['level_p']
if level_p[0] <= random_p < level_p[1]:
# log_.info(f"random_p: {random_p}, level_p: {level_p}, level: {level}")
return level_info['key'], level
else:
continue
return None, None
elif pool_type == 'flow_set_level_score':
if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
else:
# 1. 获取流量池各层级分发概率权重
if self.level_weight is None:
level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
else:
level_weight = self.level_weight
# print(level_weight)
# 2. 判断各层级是否有视频需分发
available_level = []
for level, weight in level_weight.items():
level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{self.app_type}:{level}"
if self.redis_helper.key_exists(key_name=level_key):
available_level.append((level, level_key, weight))
if len(available_level) == 0:
return None, None
# 3. 根据可分发层级权重设置分发概率
available_level = sorted(available_level, key=lambda x: x[2], reverse=False)
weight_sum = sum([int(item[2]) for item in available_level])
level_p_mapping = {}
level_p_low = 0
weight_temp = 0
for item in available_level:
level, level_key, weight = item[0], item[1], item[2]
level_p_up = (weight_temp + weight)/weight_sum
level_p_mapping[level] = {
'key': level_key,
'level_p': [round(level_p_low, 2), round(level_p_up, 2)]
}
level_p_low = round(level_p_up, 2)
weight_temp += weight
# log_.info(f"level_p_mapping: {level_p_mapping}")
# 4. 随机生成[0,1)之间数,返回相应概率区间的key
random_p = random.random()
for level, level_info in level_p_mapping.items():
level_p = level_info['level_p']
if level_p[0] <= random_p < level_p[1]:
# log_.info(f"random_p: {random_p}, level_p: {level_p}, level: {level}")
return level_info['key'], level
else:
continue
return None, None
elif pool_type == 'special':
key_name_prefix = config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS
# 判断列表是否更新,未更新则使用前一天的列表
key_name = f"{key_name_prefix}{time.strftime('%Y%m%d')}"
if self.redis_helper.key_exists(key_name):
redis_date = date.today().strftime('%Y%m%d')
else:
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
key_name = f"{key_name_prefix}{redis_date}"
# 判断当前时间是否晚于更新时间,发送消息到飞书
# now_h = datetime.now().hour
# feishu_text = '{} —— 今日special mid 数据未按时更新,请及时查看解决。'.format(config_.ENV_TEXT)
# if now_h > config_.ROV_UPDATE_H:
# send_msg_to_feishu(feishu_text)
return key_name, redis_date
else:
log_.error('pool type error')
return None, None
def get_video_last_idx(self, video_type=''):
"""获取用户上一次在rov召回池对应的位置"""
# if self.ab_code in [config_.AB_CODE['rank_by_h']] or self.app_type == config_.APP_TYPE['APP']:
# abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()] + \
[code for _, code in config_.AB_CODE['region_rank_by_h'].items()] + \
[config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
config_.AB_CODE['top_video_relevant_appType_19']] + \
[code for _, code in config_.AB_CODE['rank_by_24h'].items()] or \
video_type == 'whole_movies':
rov_pool_key, redis_date = self.get_pool_redis_key_with_h('rov', video_type=video_type)
elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
rov_pool_key, redis_date = self.get_pool_redis_key_with_day('dup')
else:
rov_pool_key, redis_date = self.get_pool_redis_key('rov')
if not rov_pool_key:
return None, None, None
if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
now_h = datetime.now().hour
if now_h < 7:
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX
else:
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX
elif video_type == 'whole_movies':
last_key_prefix = config_.LAST_VIDEO_FROM_WHOLE_MOVIES_PREFIX
else:
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX
last_rov_recall_key = f'{last_key_prefix}{self.app_type}:{self.mid}:{redis_date}'
value = self.redis_helper.get_data_from_redis(last_rov_recall_key)
if value:
idx = self.redis_helper.get_index_with_data(rov_pool_key, value)
if not idx:
idx = 0
else:
idx += 1
else:
idx = 0
return rov_pool_key, last_rov_recall_key, idx
def get_update_rov_videos(self):
"""
获取修改ROV的视频
:return: update_rov_video_ids, update_rov_result
"""
try:
# 获取修改过ROV的视频
if self.app_type == config_.APP_TYPE['APP']:
key_name = config_.UPDATE_ROV_KEY_NAME_APP
else:
key_name = config_.UPDATE_ROV_KEY_NAME
# redis_helper = RedisHelper()
data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
start=0, end=-1, with_scores=True)
# 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
if data is None:
return [], []
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
# 添加视频源参数 pushFrom, abCode
update_rov_video_ids, update_rov_result = [], []
if not filtered_result:
return update_rov_video_ids, update_rov_result
for item in filtered_result:
video_id = int(item)
rov_score = video_score.get(video_id)
if rov_score is None:
continue
update_rov_video_ids.append(video_id)
update_rov_result.append({'videoId': video_id, 'rovScore': rov_score,
'pushFrom': config_.PUSH_FROM['rov_recall'], 'abCode': self.ab_code})
return update_rov_video_ids, update_rov_result
except Exception as e:
log_.error(traceback.format_exc())
return [], []
def get_top_videos(self):
"""
获取置顶视频
:return: top_video_ids, top_video_result
"""
try:
# 获取生效中的置顶视频列表
# redis_helper = RedisHelper()
if self.app_type == config_.APP_TYPE['APP']:
key_name = config_.TOP_VIDEO_LIST_KEY_NAME_APP
else:
key_name = config_.TOP_VIDEO_LIST_KEY_NAME
data = self.redis_helper.get_data_from_redis(key_name=key_name)
# log_.info('===1=== {}'.format(data))
if data is None:
return [], []
# 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
video_ids = []
video_info = {}
for item in eval(data):
# log_.info('=== uid: {}, item === {}'.format(self.uid, item))
video_id = int(item['videoId'])
# 判断 视频推荐区域与用户地址信息 是否匹配,市级别
city_code_list = item['cityCode'].split(',')
# log_.info('=== uid: {}, city_code_list: {},{}, cityCode: {},{} === '.format(
# self.uid, city_code_list, type(city_code_list[0]), self.client_info.get('cityCode'),
# type(self.client_info.get('cityCode'))))
if self.client_info.get('cityCode') in city_code_list or config_.ALL_AREA_CODE in city_code_list:
# log_.info('=== uid: {}, ===2=== {}, ===3=== video_id: {}, cityCode: {}'.format(
# self.uid, self.client_info.get('cityCode'), video_id, city_code_list))
if video_id in video_ids:
# 如果video_id有重复,score保留大分值
if item['score'] > video_info[video_id].get('score'):
video_info[video_id] = {'score': item['score'], 'cityCode': city_code_list}
else:
continue
else:
video_ids.append(video_id)
video_info[video_id] = {'score': item['score'], 'cityCode': city_code_list}
else:
continue
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
# 添加视频源参数 pushFrom = 'op_manual', abCode
top_video_ids, top_video_result = [], []
if not filtered_result:
return top_video_ids, top_video_result
for item in filtered_result:
video_id = int(item)
item_info = video_info.get(video_id)
if item_info is None:
continue
top_video_ids.append(video_id)
top_video_result.append({'videoId': video_id, 'rovScore': item_info.get('score'),
'pushFrom': config_.PUSH_FROM['top'], 'abCode': self.ab_code})
return top_video_ids, top_video_result
except Exception as e:
log_.error(traceback.format_exc())
return [], []
def get_pool_redis_key_with_h(self, pool_type, video_type=''):
"""
拼接key,获取以小时级别更新的视频列表
:param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
:param video_type: 视频列表区分 whole_movies - 完整影视资源
:return: key_name
"""
if pool_type == 'rov':
now_date = date.today().strftime('%Y%m%d')
# 获取当前所在小时
h = datetime.now().hour
# appType = 13, 票圈视频APP
# 数据更新周期:每天07:00-21:00, 2h/次
# if self.app_type == config_.APP_TYPE['APP']:
# if h < 7:
# key_h = 21
# key_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
# elif h > 21:
# key_h = 21
# key_date = now_date
# else:
# if h % 2 == 0:
# key_h = h - 1
# key_date = now_date
# else:
# key_h = h
# key_date = now_date
# # print(key_date, key_h)
# key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{key_date}.{key_h}'
# if self.redis_helper.key_exists(key_name):
# return key_name, key_h
# else:
# if key_h == 7:
# redis_h = 21
# redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
# else:
# redis_h = key_h - 2
# redis_date = key_date
# # print(redis_date, redis_h)
# key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}'
# return key_name, redis_h
# elif self.app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
# abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
if self.ab_code in [
config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
config_.AB_CODE['top_video_relevant_appType_19']
]:
# 判断热度列表是否更新,未更新则使用前一小时的热度列表
key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{self.app_type}:"
key_name = f"{key_name_prefix}{now_date}:{h}"
if self.redis_helper.key_exists(key_name):
return key_name, h
else:
if h == 0:
redis_h = 23
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
else:
redis_h = h - 1
redis_date = now_date
key_name = f"{key_name_prefix}{redis_date}:{redis_h}"
# 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
# now_m = datetime.now().minute
# feishu_text = '{} —— appType = {}, h = {} 数据未按时更新,请及时查看解决。'.format(
# config_.ENV_TEXT, self.app_type, h)
# if now_m > config_.ROV_H_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
return key_name, redis_h
# 完整影视资源
elif video_type == 'whole_movies':
# 判断完整影视资源列表是否更新,未更新则使用前一小时的热度列表
key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}"
key_name = f"{key_name_prefix}{now_date}.{h}"
if self.redis_helper.key_exists(key_name):
return key_name, h
else:
if h == 0:
redis_h = 23
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
else:
redis_h = h - 1
redis_date = now_date
key_name = f"{key_name_prefix}{redis_date}.{redis_h}"
# 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
# now_m = datetime.now().minute
# feishu_text = '{} —— appType = {}, h = {} 完整影视资源数据未按时更新,请及时查看解决。'.format(
# config_.ENV_TEXT, self.app_type, h)
# if now_m > config_.ROV_H_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
return key_name, redis_h
else:
# 判断热度列表是否更新,未更新则使用前一小时的热度列表
if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H}{region_code}."
else:
key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region_code}:"
elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_24H
else:
key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_H
key_name = f"{key_prefix}{self.data_key}:{self.rule_key}:{now_date}:{h}"
if self.redis_helper.key_exists(key_name):
return key_name, h
else:
if h == 0:
redis_h = 23
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
else:
redis_h = h - 1
redis_date = now_date
key_name = f"{key_prefix}{self.data_key}:{self.rule_key}:{redis_date}:{redis_h}"
# 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
# now_m = datetime.now().minute
# feishu_text = '{} —— appType = {}, h = {}, key = {}, province_code = {} 数据未按时更新,请及时查看解决。'.format(
# config_.ENV_TEXT, self.app_type, h, key_name, self.client_info.get('provinceCode', '-1'))
# if now_m > config_.ROV_H_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
return key_name, redis_h
elif pool_type == 'flow':
return config_.FLOW_POOL_KEY_NAME_PREFIX + str(self.app_type)
else:
# log_.error('pool type error')
return None, None
def flow_pool_recall_18_19(self, size=4, push_from=config_.PUSH_FROM['flow_recall']):
"""从流量池中获取视频"""
start_time = time.time()
flow_pool_key = self.get_pool_redis_key('flow')
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL_18_19:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 score做mapping整理
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
# 添加视频源参数 pushFrom, abCode
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
flow_pool_recall_result.extend(temp_result)
idx += get_size
log_.info({
'logTimestamp': int(time.time() * 1000),
'request_id': self.request_id,
'operation': 'flow_pool_recall_18_19',
'executeTime': (time.time() - start_time) * 1000
})
return flow_pool_recall_result[:size]
def get_pool_redis_key_with_day(self, pool_type):
"""
拼接key,获取以天级别规则更新的视频列表
:param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
:return: key_name
"""
now_date = datetime.today()
now_dt = datetime.strftime(now_date, '%Y%m%d')
if pool_type == 'rov':
# 判断列表是否更新,未更新则使用前一天的列表
rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{now_dt}'
if self.redis_helper.key_exists(key_name=rule_key_name):
return rule_key_name, now_dt
else:
redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{redis_dt}'
# 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
# now_h = datetime.now().hour
# feishu_text = f'{config_.ENV_TEXT} —— appType = {self.app_type}, date = {now_dt}, ' \
# f'rule_key = {self.rule_key} 数据未按时更新,请及时查看解决。'
# if now_h > config_.ROV_DAY_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
return rule_key_name, redis_dt
elif pool_type == 'dup':
now_h = datetime.now().hour
if now_h < 7:
dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
else:
dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{now_dt}'
if self.redis_helper.key_exists(key_name=dup_key_name):
return dup_key_name, now_dt
else:
redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{redis_dt}'
return dup_key_name, redis_dt
elif pool_type == 'flow':
return config_.FLOW_POOL_KEY_NAME_PREFIX + str(self.app_type)
else:
# log_.error('pool type error')
return None, None
def get_video_last_idx_day(self):
"""获取用户上一次在天级规则更新列表中对应的位置"""
rule_key_name, redis_dt = self.get_pool_redis_key_with_day('rov')
if not rule_key_name:
return None, None, None
last_rule_day_recall_key = \
f'{config_.LAST_VIDEO_FROM_RULE_DAY_POOL_PREFIX}{self.app_type}.{self.mid}.{redis_dt}'
value = self.redis_helper.get_data_from_redis(last_rule_day_recall_key)
if value:
idx = self.redis_helper.get_index_with_data(rule_key_name, value)
if not idx:
idx = 0
else:
idx += 1
else:
idx = 0
return rule_key_name, last_rule_day_recall_key, idx
def old_videos_recall(self, size):
"""老视频召回"""
# 获取老视频
now_dt = datetime.strftime(datetime.today(), '%Y%m%d')
key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_dt}'
old_videos = self.redis_helper.get_data_from_set(key_name=key_name)
if not old_videos:
return []
# 过滤
old_video_ids = [int(video_id) for video_id in old_videos]
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, video_ids=old_video_ids, mid=self.mid, uid=self.uid)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_videos = ge.get()
if not filtered_videos:
return []
# 添加视频源参数 pushFrom, abCode
old_video_result = [{'videoId': int(item), 'rovScore': 0,
'pushFrom': config_.PUSH_FROM['old_video'], 'abCode': self.ab_code}
for item in filtered_videos]
# 随机抽取 size+1 条数据
random.shuffle(old_video_result)
return old_video_result[:size+1]
def get_region_dup_video_last_idx_h(self, province_code, region_dup=None):
"""获取用户上一次在 地域分组 相关去重列表中对应的位置"""
now_date = date.today().strftime('%Y%m%d')
h = datetime.now().hour
if region_dup == 1:
# 小程序地域分组天级更新结果与小程序地域分组小时级更新结果去重
key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H
elif region_dup == 2:
if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
# 小程序天级更新结果与 小程序地域分组小时级更新24h结果 去重
key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H
else:
# 小程序天级更新结果与 小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重
key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H
else:
key_name_prefix = ''
key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{now_date}.{h}"
last_region_dup_key = \
f'{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{h}'
if not self.redis_helper.key_exists(key_name=key_name):
if h == 0:
redis_h = 23
redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
else:
redis_h = h - 1
redis_date = now_date
key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{redis_date}.{redis_h}"
last_region_dup_key = \
f"{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{redis_h}"
# 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
# now_m = datetime.now().minute
# feishu_text = f"{config_.ENV_TEXT} —— appType = {self.app_type}, h = {h}, key = {key_name}, " \
# f"province_code = {province_code} 数据未按时更新,请及时查看解决。"
# if now_m > config_.REGION_H_UPDATE_MINUTE:
# send_msg_to_feishu(feishu_text)
value = self.redis_helper.get_data_from_redis(last_region_dup_key)
if value:
idx = self.redis_helper.get_index_with_data(key_name, value)
if not idx:
idx = 0
else:
idx += 1
else:
idx = 0
return key_name, last_region_dup_key, idx
def rov_pool_recall_with_region_process(self, size=4, expire_time=24*3600):
"""
地域分组召回视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
start_time = time.time()
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
if region_code == '-1':
t = [
gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
]
else:
t = [
gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, 'h', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
]
gevent.joinall(t)
region_recall_result_list = [i.get() for i in t]
# 将已获取到的视频按顺序去重合并
now_video_ids = []
recall_result = []
recall_num = size
# if ab_code and exp_config:
# if ab_code==60058 or ab_code==60059 or ab_code == 60060 \
# or ab_code == 60061 or ab_code==60052 \
# or ab_code==60053 or ab_code==60057 :
# try:
# recall_num = int(exp_config['recall_num'])
# except:
# recall_num = size
# if recall_num<size:
# recall_num = size
# todo zhangbo rank 放开地域的截断,给后续排序更多供给 做规则调控
if self.ab_code == 60098:
recall_num = size * 3
for region_result in region_recall_result_list:
for video in region_result:
video_id = video.get('videoId')
if video_id not in now_video_ids:
recall_result.append(video)
now_video_ids.append(video_id)
if len(recall_result) >= recall_num:
break
else:
continue
# # 130实验组不获取大列表的数据
# if self.ab_code != config_.AB_CODE['region_rank_by_h'].get('abtest_130'):
# # 判断获取到的小时级数据数量
# if len(recall_result) < size:
# # 补充数据
# rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
# # 去重合并
# for video in rov_recall_result:
# vid = video.get('videoId')
# if vid not in now_video_ids:
# recall_result.append(video)
# now_video_ids.append(vid)
# if len(recall_result) >= size:
# break
# else:
# continue
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'rov_pool_recall_with_region',
# 'executeTime': (time.time() - start_time) * 1000
# })
#print("recall_num:", recall_num)
#print("recall_result:", recall_result[:recall_num])
if self.h_data_key is None or self.h_rule_key is None:
return recall_result[:recall_num]
else:
# 补充不区分地域小时级结果(不做离线去重)
recall_h_h_result = self.recall_h(province_code=region_code, size=size, expire_time=expire_time)
# 去重
recall_result = recall_result[:recall_num]
recall_video_ids = [video.get('videoId') for video in recall_result]
for video in recall_h_h_result:
video_id = video.get('videoId')
if video_id not in recall_video_ids:
recall_result.append(video)
recall_video_ids.append(video_id)
return recall_result
def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
"""召回池召回视频"""
# 获取召回池中视频
videos = self.rov_pool_recall_with_region_process(size=size, expire_time=expire_time)
# 对在流量池中存在的视频添加标记字段
result = []
for item in videos:
video_id = item['videoId']
t = [
gevent.spawn(self.get_video_flow_pool, video_id, True),
gevent.spawn(self.get_video_flow_pool, video_id, False)
]
gevent.joinall(t)
flow_pool_list = [i.get() for i in t]
flow_pool_list = [item for item in flow_pool_list if item != '']
if len(flow_pool_list) > 0:
flow_pool = flow_pool_list[0]
item['flowPool'] = flow_pool
item['isInFlowPool'] = 1
result.append(item)
return result
def get_video_flow_pool(self, video_id, quick_flow_pool=False):
"""
获取videoId对应的任意一个flowPool
:param video_id: videoId
:param quick_flow_pool: 是否为快速曝光流量池标识,默认:否 False
:return: flow_pool
"""
if quick_flow_pool is True:
isin_flow_pool_key = \
f"{config_.QUICK_FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
flow_pool_key = \
f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:" \
f"{config_.QUICK_FLOW_POOL_ID}:{video_id}"
else:
isin_flow_pool_key = \
f"{config_.FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}"
flow_pool_key = \
f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:{video_id}"
# 判断是否在流量池中
isin_flow_pool = self.redis_helper.data_exists_with_set(key_name=isin_flow_pool_key, value=video_id)
flow_pool = ''
if isin_flow_pool:
# 随机获取一个flowPool标记
flow_pool_list = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=1)
if len(flow_pool_list) > 0:
flow_pool = flow_pool_list[0]
return flow_pool
def get_flow_pool_videos(self):
"""获取当前可分发的流量池视频,以及对应的标记列表"""
video_id_list = []
videos_flow_pool = {}
# 快速曝光流量池
key_name_quick = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
# 其他流量池
key_name_other = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
for key_name in [key_name_quick, key_name_other]:
data = self.redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=False)
if data is None or len(data) == 0:
continue
for item in data:
video_id, flow_pool = item.split('-')
video_id = int(video_id)
# ### 对该视频做分发数检查
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
# 无记录
if cur_count is None:
continue
# 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
if cur_count <= 0:
remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
self.redis_helper.del_keys(remain_count_key)
for app_name in config_.APP_TYPE:
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(app_name)}"
quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}" \
f"{config_.APP_TYPE.get(app_name)}:{config_.QUICK_FLOW_POOL_ID}"
self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=item)
self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=item)
continue
# 本地分发数 cur_count > 0
if video_id in video_id_list:
videos_flow_pool[video_id].append(flow_pool)
else:
videos_flow_pool[video_id] = [flow_pool]
video_id_list.append(video_id)
return {'video_id_list': video_id_list, 'videos_flow_pool': videos_flow_pool}
def rov_pool_recall_with_region_by_h(self, province_code, size=4, key_flag=''):
"""
地域分组小时级视频召回
:param size: 视频数
:param province_code: 省份code
:param key_flag:
:return:
"""
if key_flag == 'region_24h':
push_from = config_.PUSH_FROM['rov_recall_region_24h']
elif key_flag == 'day_24h':
push_from = config_.PUSH_FROM['rov_recall_24h']
else:
push_from = config_.PUSH_FROM['rov_recall_region_h']
# 获取mid对应的小时级列表redis-key
h_recall_mid_key = self.get_mid_h_key(province_code=province_code, key_flag=key_flag)
if not self.redis_helper.key_exists(h_recall_mid_key):
recall_result = []
else:
# 过滤的视频
fil_video_ids = []
recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
start=(freq - 1) * get_size, end=freq * get_size - 1,
with_scores=True)
if not data:
# log_.info('地域分组小时级更新视频已取完')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.data_key,
self.ab_code, province_code, key_flag)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
recall_result.extend(temp_result)
fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
else:
fil_video_ids.extend(video_ids)
# 将被过滤的视频进行移除
for value in fil_video_ids:
self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
return recall_result[:size]
def region_dup_recall(self, province_code, region_dup, size=4, expire_time=23*3600):
"""
region dup 更新列表视频召回
:param province_code:
:param region_dup:
:param size:
:param expire_time:
:return:
"""
if region_dup == 1:
push_from = config_.PUSH_FROM['rov_recall_region_day']
elif region_dup == 2:
push_from = config_.PUSH_FROM['rov_recall_day']
# 获取region dup更新列表相关redis key, 用户上一次在列表对应的位置
key_name, last_region_dup_key, idx = self.get_region_dup_video_last_idx_h(
province_code=province_code, region_dup=region_dup)
# 获取天级规则更新列表数据
if not key_name:
# log_.info(f'region dup 更新列表中无视频, region_dup = {region_dup}')
recall_result = []
else:
recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
start=idx, end=idx + get_size - 1,
with_scores=True)
if not data:
# log_.info(f'region dup 更新视频已取完, region_dup = {region_dup}')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
recall_result.extend(temp_result)
else:
# 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
if self.mid:
# mid为空时,不做记录
self.redis_helper.set_data_to_redis(key_name=last_region_dup_key, value=data[-1][0],
expire_time=expire_time)
idx += get_size
return recall_result[:size]
def rule_recall_by_h(self, size=4, expire_time=24*3600):
"""
小时级召回视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
start_time = time.time()
t = [gevent.spawn(self.rov_pool_recall_h, size),
gevent.spawn(self.rov_pool_recall_h, size, '24h')]
gevent.joinall(t)
h_recall_result_list = [i.get() for i in t]
# 将已获取到的视频按顺序去重合并
now_video_ids = []
recall_result = []
for h_result in h_recall_result_list:
for video in h_result:
video_id = video.get('videoId')
if video_id not in now_video_ids:
recall_result.append(video)
now_video_ids.append(video_id)
if len(recall_result) >= size:
break
else:
continue
# 判断获取到的小时级数据数量
if len(recall_result) < size:
# 补充数据
rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
# 去重合并
for video in rov_recall_result:
vid = video.get('videoId')
if vid not in now_video_ids:
recall_result.append(video)
now_video_ids.append(vid)
if len(recall_result) >= size:
break
else:
continue
log_.info({
'logTimestamp': int(time.time() * 1000),
'request_id': self.request_id,
'operation': 'rule_recall_by_h',
'executeTime': (time.time() - start_time) * 1000
})
return recall_result[:size]
def rov_pool_recall_h(self, size=4, key_flag=''):
"""
小时级视频召回
:param size: 视频数
:param key_flag:
:return:
"""
if key_flag == '24h':
push_from = config_.PUSH_FROM['rov_recall_24h']
else:
push_from = config_.PUSH_FROM['rov_recall_h']
# 获取mid对应的小时级列表redis-key
h_recall_mid_key = self.get_mid_h_key(province_code='', key_flag=key_flag)
if not self.redis_helper.key_exists(h_recall_mid_key):
recall_result = []
else:
# 过滤的视频
fil_video_ids = []
recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
start=(freq - 1) * get_size, end=freq * get_size - 1,
with_scores=True)
if not data:
# log_.info('小时级更新视频已取完')
break
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, '', key_flag)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
recall_result.extend(temp_result)
fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
else:
fil_video_ids.extend(video_ids)
# 将被过滤的视频进行移除
for value in fil_video_ids:
self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
return recall_result[:size]
def get_relevant_videos_19(self, video_id, size=4):
"""
获取最惊奇相关推荐视频
:param video_id: 头部视频id
:return: relevant_result
"""
push_from = config_.PUSH_FROM['top_video_relevant_appType_19']
relevant_result = []
relevant_videos_key_name = f"{config_.MOVIE_RELEVANT_LIST_KEY_NAME_PREFIX}{video_id}"
# redis_helper = RedisHelper()
if not self.redis_helper.key_exists(key_name=relevant_videos_key_name):
return relevant_result
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=relevant_videos_key_name, start=0, end=-1,
with_scores=True)
if not data:
return relevant_result
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id, app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
relevant_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
return relevant_result[:size]
def relevant_recall_19(self, video_id, size=4, expire_time=24*3600):
"""
最惊奇相关推荐视频召回
:param video_id:
:param size:
:param expire_time:
:return:
"""
t = [gevent.spawn(self.get_relevant_videos_19, video_id, size),
gevent.spawn(self.rov_pool_recall, size, expire_time)]
gevent.joinall(t)
relevant_recall_result_list = [i.get() for i in t]
# 将已获取到的视频按顺序去重合并
now_video_ids = []
recall_result = []
for relevant_result in relevant_recall_result_list:
for video in relevant_result:
video_id = video.get('videoId')
if video_id not in now_video_ids:
recall_result.append(video)
now_video_ids.append(video_id)
return recall_result[:size]
def rov_pool_recall_19(self, size=4, expire_time=24*3600):
"""
最惊奇视频召回
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
t = [gevent.spawn(self.rov_pool_recall, size, expire_time=3600, video_type='whole_movies', push_from=config_.PUSH_FROM['whole_movies']),
gevent.spawn(self.flow_pool_recall_18_19, size, push_from=config_.PUSH_FROM['talk_videos'])]
gevent.joinall(t)
recall_result_list = [i.get() for i in t]
# 将已获取到的视频按顺序去重合并
now_video_ids = []
recall_result = []
for item in recall_result_list:
for video in item:
video_id = video.get('videoId')
if video_id not in now_video_ids:
recall_result.append(video)
now_video_ids.append(video_id)
if len(recall_result) >= size:
break
else:
continue
# 判断获取到的小时级数据数量
if len(recall_result) < size:
# 补充数据
rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
# 去重合并
for video in rov_recall_result:
vid = video.get('videoId')
if vid not in now_video_ids:
recall_result.append(video)
now_video_ids.append(vid)
if len(recall_result) >= size:
break
else:
continue
return recall_result[:size]
def update_last_video_record(self, record_key, pool_key_prefix, province_code, data_key=None, rule_key=None):
if data_key is None or rule_key is None:
data_key = self.data_key
rule_key = self.rule_key
# 判断当前小时的小时级列表是否更新
now_date = datetime.today()
h = datetime.now().hour
now_dt = datetime.strftime(now_date, '%Y%m%d')
# now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
now_pool_recall_key = f"{pool_key_prefix}{province_code}:{data_key}:{rule_key}:{now_dt}:{h}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
value = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
else:
if h == 0:
redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
redis_h = 23
else:
redis_dt = now_dt
redis_h = h - 1
# now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{redis_dt}:{redis_h}"
now_pool_recall_key = f"{pool_key_prefix}{province_code}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
value = {'date': redis_dt, 'h': redis_h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
return now_pool_recall_key
def update_last_video_record_without_region(self, record_key, pool_key_prefix, data_key=None, rule_key=None):
if data_key is None or rule_key is None:
data_key = self.data_key
rule_key = self.rule_key
# 判断当前小时的小时级列表是否更新
now_date = datetime.today()
h = datetime.now().hour
now_dt = datetime.strftime(now_date, '%Y%m%d')
now_pool_recall_key = f"{pool_key_prefix}{data_key}:{rule_key}:{now_dt}:{h}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
value = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
else:
if h == 0:
redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
redis_h = 23
else:
redis_dt = now_dt
redis_h = h - 1
now_pool_recall_key = f"{pool_key_prefix}{data_key}:{rule_key}:{redis_dt}:{redis_h}"
value = {'date': redis_dt, 'h': redis_h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
return now_pool_recall_key
def get_video_idx(self, pool_recall_key, last_video_key):
"""
获取上次视频所在位置
:param pool_recall_key: 视频所在列表 key
:param last_video_key: 上一次记录的视频
:return: idx
"""
value = self.redis_helper.get_data_from_redis(last_video_key)
if value:
idx = self.redis_helper.get_index_with_data(key_name=pool_recall_key, value=value)
if not idx:
idx = 0
else:
idx += 1
else:
idx = 0
return idx
def get_last_recommend_video_idx(self, province_code, record_key_prefix, pool_key_prefix, last_video_key_prefix,
data_key=None, rule_key=None):
if data_key is None or rule_key is None:
data_key = self.data_key
rule_key = self.rule_key
# 判断mid对应上一次视频位置 时间记录
record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
if not self.redis_helper.key_exists(key_name=record_key):
# ###### 记录key不存在
self.redis_helper.del_keys(key_name=last_video_key)
idx = 0
pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
province_code=province_code,
data_key=data_key, rule_key=rule_key)
else:
# ###### 记录key存在,判断date, h
now_date = datetime.today()
h = datetime.now().hour
# 获取记录的date, h
record = self.redis_helper.get_data_from_redis(key_name=record_key)
record_dt = eval(record).get('date')
record_h = eval(record).get('h')
now_dt = datetime.strftime(now_date, '%Y%m%d')
if record_dt == now_dt and int(record_h) == h:
# 已获取当前小时数据
# pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
pool_recall_key = f"{pool_key_prefix}{province_code}:{data_key}:{rule_key}:{now_dt}:{h}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
# 记录的h - 当前h = 1,判断当前h数据是否已更新
# now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
now_pool_recall_key = f"{pool_key_prefix}{province_code}:{data_key}:{rule_key}:{now_dt}:{h}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
new_record = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(new_record), expire_time=2*3600)
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = now_pool_recall_key
else:
# pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{record_dt}:{record_h}"
pool_recall_key = f"{pool_key_prefix}{province_code}:{data_key}:{rule_key}:{record_dt}:{record_h}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
else:
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
province_code=province_code,
data_key=data_key, rule_key=rule_key)
return pool_recall_key, idx
def get_last_recommend_video_idx_without_region(self, record_key_prefix, pool_key_prefix, last_video_key_prefix,
data_key=None, rule_key=None):
if data_key is None or rule_key is None:
data_key = self.data_key
rule_key = self.rule_key
# 判断mid对应上一次视频位置 时间记录
record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
if not self.redis_helper.key_exists(key_name=record_key):
# ###### 记录key不存在
self.redis_helper.del_keys(key_name=last_video_key)
idx = 0
pool_recall_key = self.update_last_video_record_without_region(record_key=record_key,
pool_key_prefix=pool_key_prefix,
data_key=data_key, rule_key=rule_key)
else:
# ###### 记录key存在,判断date, h
now_date = datetime.today()
h = datetime.now().hour
# 获取记录的date, h
record = self.redis_helper.get_data_from_redis(key_name=record_key)
record_dt = eval(record).get('date')
record_h = eval(record).get('h')
now_dt = datetime.strftime(now_date, '%Y%m%d')
if record_dt == now_dt and int(record_h) == h:
# 已获取当前小时数据
pool_recall_key = f"{pool_key_prefix}{data_key}:{rule_key}:{now_dt}:{h}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
# 记录的h - 当前h = 1,判断当前h数据是否已更新
now_pool_recall_key = f"{pool_key_prefix}{data_key}:{rule_key}:{now_dt}:{h}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
new_record = {'date': now_dt, 'h': h}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(new_record), expire_time=2*3600)
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = now_pool_recall_key
else:
pool_recall_key = f"{pool_key_prefix}{data_key}:{rule_key}:{record_dt}:{record_h}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
else:
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = self.update_last_video_record_without_region(record_key=record_key,
pool_key_prefix=pool_key_prefix,
data_key=data_key, rule_key=rule_key)
return pool_recall_key, idx
def recall_region_dup_24h(self, province_code, size=4, key_flag='', expire_time=2*3600):
"""
从小程序小时级24h数据 筛选后的剩余数据 更新结果中获取视频
:param size: 获取视频个数
:param key_flag: 视频表标记
:param expire_time: 末位视频记录redis过期时间
:return:
"""
start_time = time.time()
if key_flag == 'region_h':
# 分地域小时级的筛选结果
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_region_h']
elif key_flag == 'h':
# 不分地域小时级的筛选结果
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP_H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_H_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP_H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_h_h']
elif key_flag == 'region_24h':
# 分地域相对24h的筛选结果
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP1_24H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP1_24H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_region_24h']
elif key_flag == '24h_dup2':
# 不分地域相对24h的筛选结果
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_24H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP2_24H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_24h']
elif key_flag == '24h_dup3':
# 不分地域相对24h的筛选后剩余数据
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_24H
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_24h_dup']
elif key_flag == '48h_dup2':
# 不分地域相对48h的筛选结果
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_48H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP2_48H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_48h']
elif key_flag == '48h_dup3':
# 不分地域相对48h的筛选后剩余数据
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_48H
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H
last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_48H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_48h_dup']
else:
return []
# 获取相关redis key, 用户上一次在rov召回池对应的位置
pool_key, idx = self.get_last_recommend_video_idx(province_code=province_code,
record_key_prefix=record_key_prefix,
pool_key_prefix=pool_key_prefix,
last_video_key_prefix=last_video_key_prefix)
if not pool_key:
return []
recall_data = []
pool_recall_result = []
# 每次获取的视频数
# get_size = size * 5
get_size = size * self.expansion_factor
# 记录获取频次
freq = 0
while len(pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
if not data:
break
recall_data.extend(data)
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk,
force_truncation=20
)
ge = gevent.spawn(filter_.filter_videos, '', province_code, None)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
pool_recall_result.extend(temp_result)
# else:
# # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
# if self.mid and self.ab_code != config_.AB_CODE['region_rank_by_h'].get('abtest_112'):
# # mid为空时,不做记录
# last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
# self.redis_helper.set_data_to_redis(key_name=last_video_key, value=data[-1][0],
# expire_time=expire_time)
idx += get_size
pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
# if len(recall_data) > 0 and len(pool_recall_result) == 0 \
# and self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_112') and self.mid:
if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
# 召回数据不为空 & 过滤后结果为空 & 位于实验组 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
expire_time=expire_time)
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': push_from,
# 'pool_recall_result': pool_recall_result,
# 'executeTime': (time.time() - start_time) * 1000
# })
return pool_recall_result[:size]
def recall_h(self, province_code, size=4, expire_time=2*3600):
"""
从不区分地域小时级列表 更新结果中获取视频
:param size: 获取视频个数
:param expire_time: 末位视频记录redis过期时间
:return:
"""
# 不分地域小时级列表
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_H
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H_H
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_H_PREFIX
push_from = config_.PUSH_FROM['rov_recall_h_h_without_dup']
# 获取相关redis key, 用户上一次在rov召回池对应的位置
pool_key, idx = self.get_last_recommend_video_idx_without_region(record_key_prefix=record_key_prefix,
pool_key_prefix=pool_key_prefix,
last_video_key_prefix=last_video_key_prefix,
data_key=self.h_data_key,
rule_key=self.h_rule_key)
if not pool_key:
return []
recall_data = []
pool_recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
if not data:
break
recall_data.extend(data)
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos, pool_type='normal',
region_code=province_code, shield_config=self.shield_config)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
pool_recall_result.extend(temp_result)
idx += get_size
pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
# 召回数据不为空 & 过滤后结果为空 & 位于实验组 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
expire_time=expire_time)
return pool_recall_result
def update_last_video_record_by_day(self, record_key, pool_key_prefix, expire_time):
# 判断当前日期的小时级列表是否更新
now_date = datetime.today()
now_dt = datetime.strftime(now_date, '%Y%m%d')
now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
value = {'date': now_dt}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
else:
redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{redis_dt}"
value = {'date': redis_dt}
self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
return now_pool_recall_key
def get_last_recommend_video_idx_by_day(self, record_key_prefix, pool_key_prefix, last_video_key_prefix, expire_time):
# 判断mid对应上一次视频位置 时间记录
record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
if not self.redis_helper.key_exists(key_name=record_key):
# ###### 记录key不存在
self.redis_helper.del_keys(key_name=last_video_key)
idx = 0
pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
pool_key_prefix=pool_key_prefix,
expire_time=expire_time)
else:
# ###### 记录key存在,判断date
now_date = datetime.today()
# 获取记录的date
record = self.redis_helper.get_data_from_redis(key_name=record_key)
record_dt = eval(record).get('date')
now_dt = datetime.strftime(now_date, '%Y%m%d')
if record_dt == now_dt:
# 已获取当前日期数据
pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
elif record_dt == datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d'):
# 记录的dt - 当前dt = 1,判断当前h数据是否已更新
now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
if self.redis_helper.key_exists(key_name=now_pool_recall_key):
new_record = {'date': now_dt}
self.redis_helper.set_data_to_redis(key_name=record_key,
value=str(new_record),
expire_time=expire_time)
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = now_pool_recall_key
else:
pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{record_dt}"
idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
else:
idx = 0
self.redis_helper.del_keys(key_name=last_video_key)
pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
pool_key_prefix=pool_key_prefix,
expire_time=expire_time)
return pool_recall_key, idx
def recall_update_by_day(self, size=4, key_flag='', expire_time=24*3600):
"""
从天级更新列表中获取视频
:param size: 获取视频个数
:param key_flag: 视频表标记
:param expire_time: 末位视频记录redis过期时间
:return:
"""
if key_flag == '30day':
# 相对30天计算列表的筛选结果
# 视频列表
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
# mid对应上一次视频位置 时间记录
record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY
# mid对应上一次视频记录
last_video_key_prefix = config_.LAST_VIDEO_FROM_30DAY_PREFIX
push_from = config_.PUSH_FROM['rov_recall_30day']
else:
return []
# 获取相关redis key, 用户上一次在rov召回池对应的位置
pool_key, idx = self.get_last_recommend_video_idx_by_day(record_key_prefix=record_key_prefix,
pool_key_prefix=pool_key_prefix,
last_video_key_prefix=last_video_key_prefix,
expire_time=expire_time)
if not pool_key:
return []
recall_data = []
pool_recall_result = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
while len(pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_ROV_POOL:
break
# 获取数据
data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
if not data:
break
recall_data.extend(data)
# 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
video_ids = []
video_score = {}
for value in data:
video_id = int(value[0])
video_ids.append(video_id)
video_score[video_id] = value[1]
# 过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
ge = gevent.spawn(filter_.filter_videos)
ge.join()
filtered_result = ge.get()
if filtered_result:
# 添加视频源参数 pushFrom, abCode
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
'pushFrom': push_from, 'abCode': self.ab_code}
for item in filtered_result if video_score.get(int(item)) is not None]
pool_recall_result.extend(temp_result)
idx += get_size
pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
# 召回数据不为空 & 过滤后结果为空 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
expire_time=expire_time)
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': push_from,
# 'pool_recall_result': pool_recall_result,
# 'executeTime': (time.time() - start_time) * 1000
# })
return pool_recall_result[:size]
#linfan
def get_sim_hot_item_reall(self):
recall_key = "sim_hot_"+str(self.video_id)
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result =json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
'abCode': self.ab_code}
)
return recall_result[:200]
def get_sim_hot_item_reall_filter(self):
if self.video_id is None:
return []
recall_key = "sim_hot_" + str(self.video_id)
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None:
json_result = json.loads(data)
# print("json_result:", json_result)
for per_item in json_result:
try:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
'abCode': self.ab_code}
except Exception as e:
continue
if len(video_ids)<=0:
return recall_result
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
return recall_result
# get region_hour_recall
def get_region_hour_recall(self, size=4, region_code='-1'):
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_h'],
'abCode': self.ab_code}
)
return recall_result[:30]
# get region_day_recall
def get_region_day_recall(self, size=4,region_code='-1'):
"""召回池召回视频"""
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H
recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_24h'],
'abCode': self.ab_code}
)
return recall_result[:200]
def get_selected_recall(self, size=4, region_code='-1'):
"""召回池召回视频"""
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H
recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h'],
'abCode': self.ab_code}
)
#print("recall_result:", recall_result)
return recall_result[:200]
def get_no_selected_recall(self, size=4, region_code='-1'):
"""未选择召回池召回视频"""
pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h_dup'],
'abCode': self.ab_code}
)
return recall_result[:200]
def get_fast_flow_pool_recall(self, size=4):
"""快速流量池召回视频"""
recall_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['fast_flow_recall'],
'abCode': self.ab_code}
)
return recall_result
def get_flow_pool_recall(self, size=4):
"""流量池召回视频"""
recall_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
recall_result.append(
{'videoId': per_item[0], 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['normal_flow_recall'],
'abCode': self.ab_code}
)
return recall_result
def new_flow_pool_recall(self, size=10, flow_pool_id=None):
"""从流量池中获取视频"""
start_time = time.time()
flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
flow_pool_recall_result = []
flow_pool_recall_videos = []
# 每次获取的视频数
get_size = size * 5
# 记录获取频次
freq = 0
idx = 0
while len(flow_pool_recall_result) < size:
freq += 1
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
break
# 获取数据
# st_get = time.time()
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
start=idx, end=idx + get_size - 1,
with_scores=True)
et_get = time.time()
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
# freq, data, (et_get - st_get) * 1000))
if not data:
# log_.info('流量池中的视频已取完')
break
# 将video_id 与 flow_pool, score做mapping整理
video_ids = []
video_mapping = {}
video_score = {}
for value in data:
try:
video_id, flow_pool = value[0].split('-')
except Exception as e:
log_.error({
'request_id': self.request_id,
'app_type': self.app_type,
'flow_pool_value': value
})
continue
video_id = int(video_id)
if video_id not in video_ids:
video_ids.append(video_id)
video_score[video_id] = value[1]
if video_id not in video_mapping:
video_mapping[video_id] = [flow_pool]
else:
video_mapping[video_id].append(flow_pool)
# 过滤
# filter_ = FilterVideos(request_id=self.request_id,
# app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
# filtered_result = filter_.filter_videos(pool_type='flow', region_code=region_code, shield_config=self.shield_config)
# print("flow filter time:", (time.time()-et_get)*1000)
# 检查可分发数
if video_ids and len(video_ids)>0:
check_result = self.check_video_counts(video_ids=video_ids, flow_pool_mapping=video_mapping)
for item in check_result:
video_id = int(item[0])
flow_pool = item[1]
if video_id not in flow_pool_recall_videos:
# 取其中一个 flow_pool 作为召回结果
# 添加视频源参数 pushFrom, abCode
flow_pool_recall_result.append(
{'videoId': video_id, 'flowPool': flow_pool,
'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
'abCode': self.ab_code}
)
flow_pool_recall_videos.append(video_id)
# et_check = time.time()
# log_.info('check result: result = {}, execute time = {}ms'.format(
# check_result, (et_check - st_check) * 1000))
# # 判断错误标记, True为错误
# if error_flag:
# # 结束流量池召回
# break
idx += get_size
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': self.request_id,
# 'operation': 'flow_pool_recall',
# 'executeTime': (time.time() - start_time) * 1000
# })
return flow_pool_recall_result[:size]
def get_3days_hot_item_reall(self, exp_config=None):
recall_key = "hot_3day:"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="":
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['hot_3_day_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
#print("vid len:", len(video_ids))
if len(video_ids)<=0:
return recall_result
recall_num = 20
try:
if exp_config and exp_config['recall_get_num']:
recall_num = int(exp_config['recall_get_num'])
except:
recall_num = 20
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("hot recall_result:", recall_result)
#print("recall_dict:", recall_dict)
return recall_result
def get_hot_item_reall(self,exp_config=None):
#recall_key = "hot_video:"
recall_key = "hot_video:"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="":
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['hot_recall'],
'abCode': self.ab_code}
except Exception as e:
recall_result
if len(video_ids)<=0:
return recall_result
recall_num = 20
try:
if exp_config and exp_config['recall_get_num']:
recall_num = int(exp_config['recall_get_num'])
except:
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos(pool_type='rov')
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("hot recall_result:", recall_result)
return recall_result
def get_title_recall(self):
if self.video_id is None:
return []
recall_key = "title_I2I:" + str(self.video_id)
# print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
# print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="":
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['title_i2i_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
if len(video_ids) <= 0:
return recall_result
video_ids = video_ids[:50]
# print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos(pool_type='normal')
if filtered_viewed_videos is None:
return recall_result
# print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
return recall_result
def get_word2vec_item_reall(self, exp_config=None):
if self.video_id is None:
return []
recall_key = "w2v:" + str(self.video_id)
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data.strip()!="":
try:
json_result = data.strip().split(",")
for per_item in json_result:
vid = int(per_item)
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['w2v_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
if len(video_ids)<=0:
return recall_result
recall_num = 20
try:
if exp_config and exp_config['recall_get_num']:
recall_num = int(exp_config['recall_get_num'])
except:
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
return recall_result[:30]
def get_test_config(self):
recall_key = "test_exp_config_pos"
# print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None and data!="":
try:
#print(data)
json_result = json.loads(data)
#print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_w2v_config(self):
recall_key = "w2v_exp_config_pos_range"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None and data!="":
try:
#print(data)
json_result = json.loads(data)
#print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_simrecall_config(self):
recall_key = "simrecall_exp_config_pos"
# print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_simrecall_config_new(self):
recall_key = "simrecall_exp_config_range"
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None and data!="":
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_hotrecall_config(self):
recall_key = "ht_exp_config"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_U2I_config(self):
recall_key = "u2i_exp_config_pos_range_new"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None and data!="":
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_u2u2i_config(self):
recall_key = "u2u2i_exp_config_range"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_hotrecall_flow_config(self):
recall_key = "ht_flow_config:"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
return data
def get_w2v_flow_config(self):
recall_key = "w2v_flow_config:"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
return data
def get_flow_config(self):
recall_key = "flow_config:"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
return data
def get_simrecall_flow_config(self):
recall_key = "simrecall_flow_config:"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
return data
def get_flow_exp_7_config(self):
recall_key = "exp7_exp_config"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_flow_exp_8_config(self):
recall_key = "exp8_exp_config"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_sort_ab_codel_config(self):
ab_key = "sort_ab_config2"
data = self.redis_helper.get_data_from_redis(key_name=ab_key)
if data is not None:
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_U2I_reall(self, mid):
#recall_key = "hot_video:"
if not mid:
return []
recall_key = "u2i:"+mid
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="":
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2i_tag_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
if len(video_ids)<=0:
return recall_result
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("u2i recall_result:", recall_result)
return recall_result
def get_U2U2I_reall(self, mid, exp_config=None):
#recall_key = "hot_video:"
if not mid:
return []
recall_key = "u2u2i:"+mid
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None:
json_result = json.loads(data)
#print("json_result:", json_result)
for per_item in json_result:
try:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2u2i_recall'],
'abCode': self.ab_code}
except Exception as e:
continue
if len(video_ids)<=0:
return recall_result
recall_num = 20
try:
if exp_config and exp_config['recall_get_num']:
recall_num = int(exp_config['recall_get_num'])
except:
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("u2i recall_result:", recall_result)
return recall_result
def get_video_recall_config(self):
recall_key = "vr_exp_pos_config_range"
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
if data is not None and data!="":
try:
# print(data)
json_result = json.loads(data)
# print(json_result)
return json_result
except Exception as e:
return None
else:
return None
def get_return_video_reall(self, pre_key=None):
if self.video_id is None:
return []
recall_key = "rv:"+ str(self.video_id)
if pre_key :
recall_key = pre_key + str(self.video_id)
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="" :
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['return_video_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
if len(video_ids)<=0:
return recall_result
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("u2i recall_result:", recall_result)
return recall_result
def get_play_reall(self, mid, exp_config=None):
#recall_key = "hot_video:"
if not mid:
return []
recall_key = "u2i_play:"+mid
#print("recall_key:", recall_key)
data = self.redis_helper.get_data_from_redis(key_name=recall_key)
#print(data)
recall_result = []
recall_dict = {}
video_ids = []
if data is not None and data!="":
try:
json_result = json.loads(data)
for per_item in json_result:
vid = int(per_item[0])
video_ids.append(vid)
recall_dict[vid] = {'videoId': vid, 'flowPool': '',
'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2i_tag_play_recall'],
'abCode': self.ab_code}
except Exception as e:
return recall_result
if len(video_ids)<=0:
return recall_result
recall_num = 20
try:
if exp_config and exp_config['recall_get_num']:
recall_num = int(exp_config['recall_get_num'])
except:
recall_num = 20
#print("recall_num:", recall_num)
video_ids = video_ids[:recall_num]
#print(video_ids)
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
if filtered_viewed_videos is None:
return recall_result
#print("filtered_viewed_videos:", filtered_viewed_videos)
for vid in filtered_viewed_videos:
if vid in recall_dict:
recall_result.append(recall_dict[vid])
#print("u2i recall_result:", recall_result)
return recall_result
def get_region_code(self):
# 获取存在城市分组数据的城市编码列表
city_code_list = [code for _, code in config_.CITY_CODE.items()]
# 获取provinceCode
province_code = self.client_info.get('provinceCode', '-1')
# 获取cityCode
city_code = self.client_info.get('cityCode', '-1')
if city_code in city_code_list:
# 分城市数据存在时,获取城市分组数据
region_code = city_code
else:
region_code = province_code
if region_code == '':
region_code = '-1'
return region_code
def recall_strategy_trend_v1(self):
try:
# 1 获取trigger信息
region_code_province = self.client_info.get('provinceCode', '-1')
# 2 拼接redis key
key1 = "alg_recsys_recall_4h_region_trend_sum_" + region_code_province
key2 = "alg_recsys_recall_4h_region_trend_avg_" + region_code_province
# 3 取数据
data1 = self.redis_helper.get_data_from_redis(key_name=key1)
data2 = self.redis_helper.get_data_from_redis(key_name=key2)
data_for_filter = []
group_size = 20
if data1 is not None and not "" == data1:
# todo 类型转换没做兜底
data1_list = [int(i) for i in data1.split(",")]
data_for_filter.extend([data1_list[i:i + group_size] for i in range(0, len(data1_list), group_size)])
else:
data1_list = []
if data2 is not None and not "" == data2:
# todo 类型转换没做兜底
data2_list = [int(i) for i in data2.split(",")]
data2_list = [i for i in data2_list if i not in data1_list]
if len(data2_list) > 0:
data_for_filter.extend(
[data2_list[i:i + group_size] for i in range(0, len(data2_list), group_size)])
else:
data2_list = []
data_for_filter = [i for i in data_for_filter if len(i) > 0]
# data_for_filter.append([17736990, 17734880, 17734759, 17726977])
# data1_list.extend([17736990, 17734880, 17734759, 17726977])
# 4 视频过滤
filter_ = FilterVideos(request_id=self.request_id,
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=None,
expansion_factor=self.expansion_factor,
risk_filter_flag=self.risk_filter_flag,
app_region_filtered=self.app_region_filtered,
videos_with_risk=self.videos_with_risk
)
region_code = self.get_region_code()
t = [gevent.spawn(filter_.filter_videos_for_group, region_code, videos) for videos in data_for_filter]
gevent.joinall(t)
result_list = [i.get() for i in t if i.get() is not None and len(i.get()) > 0]
# 5 返回结果
results = []
for g in result_list:
for v in g:
results.append({
'videoId': v, 'flowPool': '',
'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['recall_strategy_trend_v1'],
'abCode': self.ab_code
})
return results
except Exception as e:
log_.error("error in recall_strategy_trend_v1:{}".format(e))
return []