ソースを参照

add region_h_videos filter

liqian 3 年 前
コミット
1db3dfa6ed
2 ファイル変更62 行追加10 行削除
  1. 12 10
      region_rule_rank_h.py
  2. 50 0
      videos_filter.py

+ 12 - 10
region_rule_rank_h.py

@@ -196,15 +196,15 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
         redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
 
 
-def rank_by_h(project, table, now_date, now_h, rule_params):
+def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     # 获取特征数据
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     # 获取所有的region
-    region_list = list(set(feature_df[''].to_list()))
+    # region_code_list = list(set(feature_df[''].to_list()))
     # rank
     for key, value in rule_params.items():
         log_.info(f"rule = {key}, param = {value}")
-        for region in region_list:
+        for region in region_code_list:
             log_.info(f"region = {region}")
             # 计算score
             score_df = cal_score(df=feature_df)
@@ -214,7 +214,7 @@ def rank_by_h(project, table, now_date, now_h, rule_params):
             score_df.to_csv(f'./data/{score_filename}')
 
 
-def h_rank_bottom(now_date, now_h, rule_key, project, table):
+def h_rank_bottom(now_date, now_h, rule_key, region_code_list):
     """未按时更新数据,用上一小时结果作为当前小时的数据"""
     log_.info(f"rule_key = {rule_key}")
     # 获取rov模型结果
@@ -227,9 +227,9 @@ def h_rank_bottom(now_date, now_h, rule_key, project, table):
         redis_h = now_h - 1
 
     key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H]
-    fea_df = get_feature_data(project=project, table=table, now_date=now_date - datetime.timedelta(hours=1))
-    region_list = list(set(fea_df[''].to_list()))
-    for region in region_list:
+    # fea_df = get_feature_data(project=project, table=table, now_date=now_date - datetime.timedelta(hours=1))
+    # region_list = list(set(fea_df[''].to_list()))
+    for region in region_code_list:
         log_.info(f"region = {region}")
         for key_prefix in key_prefix_list:
             key_name = f"{key_prefix}{region}.{rule_key}.{redis_dt}.{redis_h}"
@@ -250,24 +250,26 @@ def h_timer_check():
     rule_params = config_.RULE_PARAMS_REGION
     project = config_.PROJECT_REGION
     table = config_.TABLE_REGION
+    region_code_list = [code for region, code in region_code.items()]
     now_date = datetime.datetime.today()
     log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
     now_h = datetime.datetime.now().hour
     now_min = datetime.datetime.now().minute
     if now_h == 0:
         for key, _ in rule_params.items():
-            h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, project=project, table=table)
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, region_code_list=region_code_list)
         return
     # 查看当前小时更新的数据是否已准备好
     h_data_count = h_data_check(project=project, table=table, now_date=now_date)
     if h_data_count > 0:
         log_.info(f'h_data_count = {h_data_count}')
         # 数据准备好,进行更新
-        rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
+        rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
+                  project=project, table=table, region_code_list=region_code_list)
     elif now_min > 50:
         log_.info('h_recall data is None, use bottom data!')
         for key, _ in rule_params.items():
-            h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, project=project, table=table)
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key, region_code_list=region_code_list)
     else:
         # 数据没准备好,1分钟后重新检查
         Timer(60, h_timer_check).start()

+ 50 - 0
videos_filter.py

@@ -4,6 +4,7 @@ import traceback
 import ast
 from datetime import date, timedelta, datetime
 
+from region_rule_rank_h import region_code
 from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app
 from db_helper import RedisHelper
 from config import set_config
@@ -538,6 +539,53 @@ def filter_old_videos():
     log_.info("old videos filter end!")
 
 
+def filter_region_videos():
+    """过滤地域分组规则视频"""
+    region_code_list = [code for region, code in region_code.items()]
+    rule_params = config_.RULE_PARAMS_REGION
+    log_.info("region_h videos filter start ...")
+    redis_helper = RedisHelper()
+    # 获取当前日期
+    now_date = date.today().strftime('%Y%m%d')
+    # 获取当前所在小时
+    now_h = datetime.now().hour
+    log_.info(f'now_date = {now_date}, now_h = {now_h}.')
+    for region in region_code_list:
+        log_.info(f"region = {region}")
+        for key, value in rule_params.items():
+            log_.info(f"rule = {key}, param = {value}")
+            # 需过滤两个视频列表
+            key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H]
+            for i, key_prefix in enumerate(key_prefix_list):
+                # 拼接key
+                key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}"
+                log_.info(f"key_name: {key_name}")
+                # 获取视频
+                data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+                if data is None:
+                    log_.info("data is None")
+                    log_.info("filter end!")
+                    continue
+                # 过滤
+                video_ids = [int(video_id) for video_id in data]
+                filtered_result = filter_video_status(video_ids=video_ids)
+                # 求差集,获取需要过滤掉的视频,并从redis中移除
+                filter_videos = set(video_ids) - set(filtered_result)
+                log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                              len(filtered_result),
+                                                                                              len(filter_videos)))
+                if len(filter_videos) == 0:
+                    log_.info("filter end!")
+                    continue
+                redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
+                if i == 0:
+                    # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
+                    redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{key}",
+                                                   values=filter_videos, expire_time=2 * 3600)
+        log_.info(f"region = {region} videos filter end!")
+    log_.info("region_h videos filter end!")
+
+
 def main():
     try:
         # ROV召回池视频过滤
@@ -569,6 +617,8 @@ def main():
         filter_rov_day()
         # 过滤老视频数据
         filter_old_videos()
+        # 过滤地域分组小时级视频
+        filter_region_videos()
     except Exception as e:
         log_.error(traceback.format_exc())
         send_msg_to_feishu(