|
@@ -452,11 +452,11 @@ class PoolRecall(object):
|
|
|
if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
|
|
|
break
|
|
|
# 获取数据
|
|
|
- st_get = time.time()
|
|
|
+ # st_get = time.time()
|
|
|
data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
|
|
|
start=idx, end=idx + get_size - 1,
|
|
|
with_scores=True)
|
|
|
- et_get = time.time()
|
|
|
+ # et_get = time.time()
|
|
|
# log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
|
|
|
# freq, data, (et_get - st_get) * 1000))
|
|
|
if not data:
|
|
@@ -490,13 +490,12 @@ class PoolRecall(object):
|
|
|
ge = gevent.spawn(filter_.filter_videos, pool_type='flow', province_code=province_code)
|
|
|
ge.join()
|
|
|
filtered_result = ge.get()
|
|
|
- #filtered_result = filter_.filter_videos()
|
|
|
# 检查可分发数
|
|
|
if filtered_result:
|
|
|
st_check = time.time()
|
|
|
ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
|
|
|
ge.join()
|
|
|
- check_result, error_flag = ge.get()
|
|
|
+ check_result = ge.get()
|
|
|
log_.info({
|
|
|
'logTimestamp': int(time.time() * 1000),
|
|
|
'request_id': self.request_id,
|
|
@@ -506,7 +505,6 @@ class PoolRecall(object):
|
|
|
'operation': 'check_video_counts',
|
|
|
'executeTime': (time.time() - st_check) * 1000
|
|
|
})
|
|
|
- #check_result, error_flag = self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
|
|
|
|
|
|
for item in check_result:
|
|
|
video_id = int(item[0])
|
|
@@ -519,19 +517,16 @@ class PoolRecall(object):
|
|
|
'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
|
|
|
'abCode': self.ab_code}
|
|
|
)
|
|
|
- # flow_pool_recall_result.append(
|
|
|
- # {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
|
|
|
- # 'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
|
|
|
- # )
|
|
|
+
|
|
|
flow_pool_recall_videos.append(video_id)
|
|
|
- et_check = time.time()
|
|
|
+ # et_check = time.time()
|
|
|
# log_.info('check result: result = {}, execute time = {}ms'.format(
|
|
|
# check_result, (et_check - st_check) * 1000))
|
|
|
|
|
|
- # 判断错误标记, True为错误
|
|
|
- if error_flag:
|
|
|
- # 结束流量池召回
|
|
|
- break
|
|
|
+ # # 判断错误标记, True为错误
|
|
|
+ # if error_flag:
|
|
|
+ # # 结束流量池召回
|
|
|
+ # break
|
|
|
|
|
|
idx += get_size
|
|
|
|
|
@@ -551,7 +546,7 @@ class PoolRecall(object):
|
|
|
:return: check_result, error_flag
|
|
|
"""
|
|
|
# flow_pool_key = self.get_pool_redis_key('flow')
|
|
|
- videos = []
|
|
|
+ # videos = []
|
|
|
check_result = []
|
|
|
for video_id in video_ids:
|
|
|
video_id = int(video_id)
|
|
@@ -560,16 +555,26 @@ class PoolRecall(object):
|
|
|
cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
|
|
|
# 无记录
|
|
|
if cur_count is None:
|
|
|
- videos.append({'videoId': video_id, 'flowPool': flow_pool})
|
|
|
+ # videos.append({'videoId': video_id, 'flowPool': flow_pool})
|
|
|
+ continue
|
|
|
# 本地分发数 cur_count > 0
|
|
|
elif cur_count > 0:
|
|
|
check_result.append((video_id, flow_pool))
|
|
|
- # 本地分发数 cur_count <= 0,从所有的流量召回池移除
|
|
|
+ # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
|
|
|
else:
|
|
|
+ remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
|
|
|
+ self.redis_helper.del_keys(remain_count_key)
|
|
|
value = '{}-{}'.format(video_id, flow_pool)
|
|
|
for item in config_.APP_TYPE:
|
|
|
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
|
|
|
self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
|
|
|
+ quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
|
|
|
+ f":{config_.QUICK_FLOW_POOL_ID}"
|
|
|
+ self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
|
|
|
+
|
|
|
+ return check_result
|
|
|
+
|
|
|
+ """
|
|
|
# 本次视频都有本地记录
|
|
|
if len(videos) == 0:
|
|
|
error_flag = False
|
|
@@ -618,6 +623,7 @@ class PoolRecall(object):
|
|
|
flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
|
|
|
self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
|
|
|
return check_result, error_flag
|
|
|
+ """
|
|
|
|
|
|
def get_pool_redis_key(self, pool_type, flow_pool_id=None):
|
|
|
"""
|