|
@@ -39,7 +39,7 @@ def get_videos_from_flow_pool(app_type, size=1000):
|
|
retry += 1
|
|
retry += 1
|
|
continue
|
|
continue
|
|
if result['code'] != 0:
|
|
if result['code'] != 0:
|
|
- log_.info('batch_flag: {}, 获取流量池视频失败'.format(batch_flag))
|
|
|
|
|
|
+ log_.info('supply batch_flag: {}, 获取流量池视频失败'.format(batch_flag))
|
|
if retry > 2:
|
|
if retry > 2:
|
|
break
|
|
break
|
|
retry += 1
|
|
retry += 1
|
|
@@ -72,11 +72,11 @@ def get_videos_remain_view_count(video_info_list):
|
|
request_data = {'videos': videos}
|
|
request_data = {'videos': videos}
|
|
result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL,
|
|
result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL,
|
|
request_data=request_data, timeout=(0.5, 3))
|
|
request_data=request_data, timeout=(0.5, 3))
|
|
- log_.info(f"i = {i}, expend time = {(time.time()-remain_st_time)*1000}")
|
|
|
|
|
|
+ log_.info(f"supply i = {i}, expend time = {(time.time()-remain_st_time)*1000}")
|
|
if result is None:
|
|
if result is None:
|
|
continue
|
|
continue
|
|
if result['code'] != 0:
|
|
if result['code'] != 0:
|
|
- log_.error('获取视频在流量池中的剩余可分发数失败')
|
|
|
|
|
|
+ log_.error('supply 获取视频在流量池中的剩余可分发数失败')
|
|
continue
|
|
continue
|
|
for item in result['data']:
|
|
for item in result['data']:
|
|
if item['distributeCount'] is None:
|
|
if item['distributeCount'] is None:
|
|
@@ -98,7 +98,7 @@ def get_videos_remain_view_count(video_info_list):
|
|
f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SUPPLY}{config_.APP_TYPE.get(type_name)}:{level}"
|
|
f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SUPPLY}{config_.APP_TYPE.get(type_name)}:{level}"
|
|
redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
|
|
redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
|
|
bu_fen_fa_cnt = bu_fen_fa_cnt + 1
|
|
bu_fen_fa_cnt = bu_fen_fa_cnt + 1
|
|
- log_.info(f"新增加不分发过滤前后整体数量: {len(video_info_list)}:{str(bu_fen_fa_cnt)}")
|
|
|
|
|
|
+ log_.info(f"supply 新增加不分发过滤前后整体数量: {len(video_info_list)}:{str(bu_fen_fa_cnt)}")
|
|
|
|
|
|
|
|
|
|
def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
@@ -110,14 +110,14 @@ def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
try:
|
|
try:
|
|
# 从流量池获取数据
|
|
# 从流量池获取数据
|
|
videos = get_videos_from_flow_pool(app_type=app_type)
|
|
videos = get_videos_from_flow_pool(app_type=app_type)
|
|
- log_.info(f"app_type: {app_type}")
|
|
|
|
|
|
+ log_.info(f"supply app_type: {app_type}")
|
|
# log_.info(f"videos: {videos}")
|
|
# log_.info(f"videos: {videos}")
|
|
if len(videos) <= 0:
|
|
if len(videos) <= 0:
|
|
- log_.info('流量池中无需分发的视频')
|
|
|
|
|
|
+ log_.info('supply 流量池中无需分发的视频')
|
|
return video_info_list
|
|
return video_info_list
|
|
# video_id 与 flow_pool, level 进行mapping
|
|
# video_id 与 flow_pool, level 进行mapping
|
|
video_ids = set()
|
|
video_ids = set()
|
|
- log_.info('流量池中视频数:{}'.format(len(videos)))
|
|
|
|
|
|
+ log_.info('supply 流量池中视频数:{}'.format(len(videos)))
|
|
mapping = {}
|
|
mapping = {}
|
|
for video in videos:
|
|
for video in videos:
|
|
flow_pool_id = video['flowPoolId'] # 召回使用的切分ID是在这里做的。流量池中的视频是不区分ID的,也不区分层。
|
|
flow_pool_id = video['flowPoolId'] # 召回使用的切分ID是在这里做的。流量池中的视频是不区分ID的,也不区分层。
|
|
@@ -131,21 +131,21 @@ def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
mapping[video_id].append(item_info)
|
|
mapping[video_id].append(item_info)
|
|
else:
|
|
else:
|
|
mapping[video_id] = [item_info]
|
|
mapping[video_id] = [item_info]
|
|
- log_.info(f"需更新流量池视频数: {len(video_ids)}")
|
|
|
|
|
|
+ log_.info(f"supply 需更新流量池视频数: {len(video_ids)}")
|
|
|
|
|
|
# 对视频状态进行过滤
|
|
# 对视频状态进行过滤
|
|
if app_type == config_.APP_TYPE['APP']:
|
|
if app_type == config_.APP_TYPE['APP']:
|
|
filtered_videos = filter_video_status_app(list(video_ids))
|
|
filtered_videos = filter_video_status_app(list(video_ids))
|
|
else:
|
|
else:
|
|
filtered_videos = filter_video_status(list(video_ids))
|
|
filtered_videos = filter_video_status(list(video_ids))
|
|
- log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
|
|
|
|
|
|
+ log_.info('supply filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
|
|
|
|
|
|
# 涉政视频过滤
|
|
# 涉政视频过滤
|
|
if app_type not in config_.POLITICAL_RECOMMEND_APP_TYPE_LIST:
|
|
if app_type not in config_.POLITICAL_RECOMMEND_APP_TYPE_LIST:
|
|
filtered_videos = filter_political_videos(video_ids=filtered_videos)
|
|
filtered_videos = filter_political_videos(video_ids=filtered_videos)
|
|
|
|
|
|
if not filtered_videos:
|
|
if not filtered_videos:
|
|
- log_.info('流量池中视频状态不符合分发')
|
|
|
|
|
|
+ log_.info('supply 流量池中视频状态不符合分发')
|
|
return video_info_list
|
|
return video_info_list
|
|
|
|
|
|
# 上传数据到redis
|
|
# 上传数据到redis
|
|
@@ -171,7 +171,7 @@ def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
# 普通流量池视频写入redis - 分层存储
|
|
# 普通流量池视频写入redis - 分层存储
|
|
level_list = []
|
|
level_list = []
|
|
for level, videos in redis_data.items():
|
|
for level, videos in redis_data.items():
|
|
- log_.info(f"level: {level}, videos_count: {len(videos)}")
|
|
|
|
|
|
+ log_.info(f"supply level: {level}, videos_count: {len(videos)}")
|
|
level_list.append(level)
|
|
level_list.append(level)
|
|
flow_pool_key_name = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SUPPLY}{app_type}:{level}"
|
|
flow_pool_key_name = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SUPPLY}{app_type}:{level}"
|
|
# 如果key已存在,删除key
|
|
# 如果key已存在,删除key
|
|
@@ -189,17 +189,17 @@ def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
|
|
if redis_helper.key_exists(flow_pool_key_name):
|
|
if redis_helper.key_exists(flow_pool_key_name):
|
|
redis_helper.del_keys(flow_pool_key_name)
|
|
redis_helper.del_keys(flow_pool_key_name)
|
|
|
|
|
|
- log_.info('data to redis finished!')
|
|
|
|
|
|
+ log_.info('supply data to redis finished!')
|
|
|
|
|
|
return video_info_list
|
|
return video_info_list
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
|
|
|
|
|
|
+ log_.error('supply 流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
|
|
app_type, e, traceback.format_exc()))
|
|
app_type, e, traceback.format_exc()))
|
|
send_msg_to_feishu(
|
|
send_msg_to_feishu(
|
|
webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
|
|
webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
|
|
key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
|
|
key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
|
|
- msg_text='rov-offline{} - 流量池更新失败, appType: {}, exception: {}'.format(config_.ENV_TEXT, app_type, e)
|
|
|
|
|
|
+ msg_text='rov-offline{} - supply 流量池更新失败, appType: {}, exception: {}'.format(config_.ENV_TEXT, app_type, e)
|
|
)
|
|
)
|
|
return video_info_list
|
|
return video_info_list
|
|
|
|
|
|
@@ -229,7 +229,7 @@ if __name__ == '__main__':
|
|
st_time = time.time()
|
|
st_time = time.time()
|
|
# 为避免第一个app_type获取数据不全,等待1min
|
|
# 为避免第一个app_type获取数据不全,等待1min
|
|
time.sleep(60)
|
|
time.sleep(60)
|
|
- log_.info('flow pool predict start...')
|
|
|
|
|
|
+ log_.info('supply flow pool predict start...')
|
|
# 获取对应流量池id列表
|
|
# 获取对应流量池id列表
|
|
redis_helper = RedisHelper()
|
|
redis_helper = RedisHelper()
|
|
flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
|
|
flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
|