Преглед на файлове

add video_limit_distribute: get_data & check

liqian преди 2 години
родител
ревизия
614fd6ebdb
променени са 7 файла, в които са добавени 101 реда и са изтрити 19 реда
  1. 52 6
      check_video_limit_distribute.py
  2. 7 0
      check_video_limit_distribute_task.sh
  3. 12 1
      get_video_limit_list.py
  4. 7 0
      get_video_limit_list_task.sh
  5. 13 2
      region_rule_rank_h.py
  6. 1 1
      region_rule_rank_h_by24h.py
  7. 9 9
      rule_rank_h_by_24h.py

+ 52 - 6
check_video_limit_distribute.py

@@ -1,9 +1,10 @@
 import datetime
+import numpy as np
 from config import set_config
 from log import Log
 from utils import RedisHelper
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 redis_helper = RedisHelper()
 
@@ -15,22 +16,58 @@ def update_limit_video_score(initial_videos, key_name):
     :param key_name: 视频列表对应的key
     :return:
     """
+    if not initial_videos:
+        return
     # 获取当前限流视频
     data = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS)
     if data is None:
         return
-    limit_video_id_list = [int(video[0]) for video in data]
     # 获取限流视频对应的score
-    limit_video_initial_score = {}
-    for video_id in limit_video_id_list:
+    limit_video_initial_score = []
+    for video in data:
+        video_id = int(video[0])
         initial_score = initial_videos.get(video_id, None)
         if initial_score is not None:
-            limit_video_initial_score[video_id] = initial_score
+            limit_video_initial_score.append((video_id, initial_score))
+
+    log_.info(f"limit_video_initial_score = {limit_video_initial_score}")
+
     if len(limit_video_initial_score) == 0:
         return
 
     # 获取原始列表的分数的中位数
-
+    initial_video_score_list = sorted([val for key, val in initial_videos.items()], reverse=False)
+    media_score = np.median(initial_video_score_list)
+    # 取中位数后一位
+    if len(initial_video_score_list) % 2 == 0:
+        temp_index = len(initial_video_score_list)//2
+    else:
+        temp_index = len(initial_video_score_list) // 2 + 1
+    if len(initial_video_score_list) > 1:
+        temp_score = initial_video_score_list[temp_index]
+    else:
+        temp_score = 0
+
+    # 对限流视频score进行调整
+    limit_video_final_score = {}
+    limit_video_initial_score.sort(key=lambda x: x[1], reverse=True)
+    limit_video_id_list = []
+    for video_id, initial_score in limit_video_initial_score:
+        if initial_score > media_score:
+            limit_video_id_list.append(video_id)
+    if len(limit_video_id_list) > 0:
+        limit_score_step = (temp_score - media_score) / (len(limit_video_id_list) + 1)
+        for i, video_id in enumerate(limit_video_id_list):
+            final_score = media_score - limit_score_step * (i + 1)
+            limit_video_final_score[int(video_id)] = final_score
+
+    log_.info(f"media_score = {media_score}, temp_score = {temp_score}, "
+              f"limit_video_final_score = {limit_video_final_score}")
+
+    # 更新限流视频的score
+    if len(limit_video_final_score) == 0:
+        return
+    redis_helper.add_data_with_zset(key_name=key_name, data=limit_video_final_score)
 
 
 def check_videos_distribute():
@@ -66,6 +103,8 @@ def check_region_videos():
 
     # 获取已超分发视频
     stop_distribute_video_id_list = check_videos_distribute()
+    log_.info(f"stop_distribute_video_id_list = {stop_distribute_video_id_list}, "
+              f"count = {len(stop_distribute_video_id_list)}")
     if len(stop_distribute_video_id_list) == 0:
         return
 
@@ -112,6 +151,13 @@ def check_region_videos():
         log_.info(f"region = {region} videos check end!")
     log_.info("region_h videos check end!")
 
+    # 将已超分发视频 移除 原始大列表
+    key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
+    if not redis_helper.key_exists(key_name=key_name):
+        redis_date = now_date - datetime.timedelta(days=1)
+        key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(redis_date, '%Y%m%d')}"
+    redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
+
 
 if __name__ == '__main__':
     check_region_videos()

+ 7 - 0
check_video_limit_distribute_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/check_video_limit_distribute.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/check_video_limit_distribute.py
+fi

+ 12 - 1
get_video_limit_list.py

@@ -13,10 +13,21 @@ def get_limit_videos(now_date):
     """获取限流视频并存入redis"""
     # 通过接口获取需要限流的视频
     data = request_post(request_url=config_.GET_VIDEO_LIMIT_LIST_URL)
+    if data is None:
+        return
+    limit_data = data.get('data', [])
+    log_.info(f"limit_data = {limit_data}")
     video_limit_list = []  # [(videoId, maxDistributeCount), ...]
     video_id_list = []
+    for item in limit_data:
+        video_id = int(item['videoId'])
+        max_distribute_count = int(item['maxDistributeCount'])
+        video_id_list.append(video_id)
+        video_limit_list.append((video_id, max_distribute_count))
+    if len(video_limit_list) == 0:
+        return
     # 视频对应最大分发数 存入redis
-    redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS, value=video_limit_list)
+    redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS, value=str(video_limit_list))
     # 限流视频videoId 存入当日redis key
     redis_helper.add_data_with_set(
         key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.datetime.strftime(now_date, '%Y%m%d')}",

+ 7 - 0
get_video_limit_list_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/get_video_limit_list.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/get_video_limit_list.py
+fi

+ 13 - 2
region_rule_rank_h.py

@@ -12,6 +12,7 @@ from threading import Timer
 from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status
 from config import set_config
 from log import Log
+from check_video_limit_distribute import update_limit_video_score
 
 config_, _ = set_config()
 log_ = Log()
@@ -165,6 +166,8 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
         f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
     if len(h_recall_result) > 0:
         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
+        # 限流视频score调整
+        update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
         # 清空线上过滤应用列表
         redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{rule_key}")
 
@@ -215,6 +218,10 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
         if len(region_24h_dup) > 0:
             redis_helper.add_data_with_zset(key_name=region_24h_dup_key_name, data=region_24h_dup, expire_time=23 * 3600)
+            # 限流视频score调整
+            update_limit_video_score(initial_videos=region_24h_dup, key_name=region_24h_dup_key_name)
+            # 清空线上过滤应用列表
+            redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
 
     # ##### 去重小程序天级更新结果,并另存为redis中
     # day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}rule2.{datetime.datetime.strftime(now_date, '%Y%m%d')}"
@@ -251,8 +258,10 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
         if len(day_dup) > 0:
             redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
-        # 清空线上过滤应用列表
-        redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{rule_key}")
+            # 限流视频score调整
+            update_limit_video_score(initial_videos=day_dup, key_name=day_dup_key_name)
+            # 清空线上过滤应用列表
+            redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{rule_key}")
 
     # ##### 去重小程序模型更新结果,并另存为redis中
     model_key_name = get_rov_redis_key(now_date=now_date)
@@ -269,6 +278,8 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, re
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
     if len(model_data_dup) > 0:
         redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
+        # 限流视频score调整
+        update_limit_video_score(initial_videos=model_data_dup, key_name=model_data_dup_key_name)
 
 
 def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):

+ 1 - 1
region_rule_rank_h_by24h.py

@@ -199,7 +199,7 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
         # 清空线上过滤应用列表
         redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
     # 与其他召回视频池去重,存入对应的redis
-    dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
+    # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
 
 
 def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):

+ 9 - 9
rule_rank_h_by_24h.py

@@ -173,15 +173,15 @@ def video_rank_h(df, now_date, now_h, rule_key, param):
         redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{rule_key}")
 
     # 去重更新rov模型结果,并另存为redis中
-    initial_data_dup = {}
-    for video_id, score in initial_data:
-        if int(video_id) not in day_video_ids:
-            initial_data_dup[int(video_id)] = score
-    log_.info(f"initial data dup count = {len(initial_data_dup)}")
-
-    initial_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_24H}{rule_key}.{now_dt}.{now_h}"
-    if len(initial_data_dup) > 0:
-        redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
+    # initial_data_dup = {}
+    # for video_id, score in initial_data:
+    #     if int(video_id) not in day_video_ids:
+    #         initial_data_dup[int(video_id)] = score
+    # log_.info(f"initial data dup count = {len(initial_data_dup)}")
+    #
+    # initial_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_24H}{rule_key}.{now_dt}.{now_h}"
+    # if len(initial_data_dup) > 0:
+    #     redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
 
 
 def rank_by_h(now_date, now_h, rule_params, project, table):