소스 검색

add videos filter task & update

liqian 3 년 전
부모
커밋
765a14aecf
4개의 변경된 파일186개의 추가작업 그리고 6개의 파일을 삭제
  1. 6 5
      db_helper.py
  2. 1 1
      utils.py
  3. 178 0
      videos_filter.py
  4. 1 0
      videos_filter_task.sh

+ 6 - 5
db_helper.py

@@ -101,10 +101,11 @@ class RedisHelper(object):
         if not conn.exists(key_name):
         if not conn.exists(key_name):
             return None
             return None
         data = conn.zrange(key_name, start, end, desc, with_scores)
         data = conn.zrange(key_name, start, end, desc, with_scores)
-        if with_scores:
-            return data
-        else:
-            return [eval(value) for value in data]
+        return data
+        # if with_scores:
+        #     return data
+        # else:
+        #     return [eval(value) for value in data]
 
 
     def get_score_with_value(self, key_name, value):
     def get_score_with_value(self, key_name, value):
         """
         """
@@ -140,7 +141,7 @@ class RedisHelper(object):
         :return: None
         :return: None
         """
         """
         conn = self.connect()
         conn = self.connect()
-        conn.zrem(key_name, value)
+        conn.zrem(key_name, *value)
 
 
     def remove_by_rank_from_zset(self, key_name, start, stop):
     def remove_by_rank_from_zset(self, key_name, start, stop):
         """
         """

+ 1 - 1
utils.py

@@ -167,7 +167,7 @@ def filter_video_status(video_ids):
 
 
     hologres_helper = HologresHelper()
     hologres_helper = HologresHelper()
     data = hologres_helper.get_data(sql=sql)
     data = hologres_helper.get_data(sql=sql)
-    filtered_videos = [temp[0] for temp in data]
+    filtered_videos = [int(temp[0]) for temp in data]
     return filtered_videos
     return filtered_videos
 
 
 
 

+ 178 - 0
videos_filter.py

@@ -0,0 +1,178 @@
+import time
+import traceback
+from datetime import date, timedelta
+
+from utils import filter_video_status, send_msg_to_feishu
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+def filter_rov_pool():
+    """ROV召回池视频过滤"""
+    log_.info("rov recall pool filter start ...")
+    # 拼接redis-key
+    key_name, _ = get_pool_redis_key(pool_type='rov')
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("rov recall pool filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("rov recall pool filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
+    log_.info("rov recall pool filter end!")
+
+
+def filter_flow_pool():
+    """流量池视频过滤"""
+    log_.info("flow pool filter start ...")
+    for _, app_type in config_.APP_TYPE.items():
+        log_.info('app_type {} videos filter start...'.format(app_type))
+        # 拼接redis-key
+        key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
+        # 获取视频
+        redis_helper = RedisHelper()
+        data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+        if data is None:
+            log_.info("data is None")
+            log_.info("app_type {} videos filter end!".format(app_type))
+            continue
+        # videoId与flowPool做mapping
+        video_ids = []
+        mapping = {}
+        for video in data:
+            video_id, flow_pool = video.split('-')
+            video_id = int(video_id)
+            if video_id not in video_ids:
+                video_ids.append(video_id)
+                mapping[video_id] = [flow_pool]
+            else:
+                mapping[video_id].append(flow_pool)
+        # 过滤
+        filtered_result = filter_video_status(video_ids=video_ids)
+        # 求差集,获取需要过滤掉的视频,并从redis中移除
+        filter_videos = set(video_ids) - set(filtered_result)
+        log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
+            len(data), len(video_ids), len(filtered_result), len(filter_videos)))
+        # 移除
+        if len(filter_videos) == 0:
+            log_.info("app_type {} videos filter end!".format(app_type))
+            continue
+        remove_videos = ['{}-{}'.format(video_id, flow_pool)
+                         for video_id in filter_videos
+                         for flow_pool in mapping[video_id]]
+        redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
+        log_.info("app_type {} videos filter end!".format(app_type))
+    log_.info("flow pool filter end!")
+
+
+def filter_bottom():
+    """兜底视频过滤"""
+    log_.info("bottom videos filter start ...")
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("bottom videos filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("bottom videos filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
+    log_.info("bottom videos filter end!")
+
+
+def filter_rov_updated():
+    """修改过ROV的视频过滤"""
+    log_.info("update rov videos filter start ...")
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("update rov videos filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("update rov videos filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
+    log_.info("update rov videos filter end!")
+
+
+def get_pool_redis_key(pool_type, app_type=None):
+    """
+    拼接key
+    :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
+    :param app_type: 产品标识
+    :return: key_name
+    """
+    redis_helper = RedisHelper()
+    if pool_type == 'rov':
+        # 判断热度列表是否更新,未更新则使用前一天的热度列表
+        key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
+        if redis_helper.key_exists(key_name):
+            redis_date = date.today().strftime('%Y%m%d')
+        else:
+            redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
+            key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
+        return key_name, redis_date
+
+    elif pool_type == 'flow':
+        # 流量池
+        return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+
+    else:
+        log_.error('pool type error')
+        return None, None
+
+
+def main():
+    try:
+        # ROV召回池视频过滤
+        filter_rov_pool()
+        # 流量池视频过滤
+        filter_flow_pool()
+        # 兜底视频过滤
+        filter_bottom()
+        # 修改过ROV的视频过滤
+        filter_rov_updated()
+    except Exception as e:
+        log_.error(traceback.format_exc())
+        send_msg_to_feishu('生产环境 - 过滤失败 \n {}'.format(traceback.format_exc()))
+        return
+
+
+if __name__ == '__main__':
+    main()

+ 1 - 0
videos_filter_task.sh

@@ -0,0 +1 @@
+cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/videos_filter.py