Browse Source

add old video recall task

liqian 3 years ago
parent
commit
067132fa20
4 changed files with 64 additions and 5 deletions
  1. 2 0
      config.py
  2. 23 5
      old_video_recall.py
  3. 7 0
      old_video_recall_task.sh
  4. 32 0
      videos_filter.py

+ 2 - 0
config.py

@@ -127,6 +127,8 @@ class BaseConfig(object):
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup.day.pre.{rule_key}.{date}
     RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE = 'com.weiqu.video.recall.hot.item.score.dup.day.pre.'
 
+    # 小程序老视频更新结果存放 redis key 前缀,完整格式:'com.weiqu.video.recall.old.item.{date}'
+    RECALL_KEY_NAME_PREFIX_OLD_VIDEOS = 'com.weiqu.video.recall.old.item.'
 
     # app应用 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.app.{date}
     RECALL_KEY_NAME_PREFIX_APP = 'com.weiqu.video.recall.hot.item.score.app.'

+ 23 - 5
old_video_recall.py

@@ -5,7 +5,8 @@
 # @Software: PyCharm
 import pandas as pd
 from datetime import datetime
-from utils import get_data_from_odps
+from utils import get_data_from_odps, filter_video_status
+from db_helper import RedisHelper
 from config import set_config
 from log import Log
 
@@ -15,14 +16,31 @@ log_ = Log()
 
 def get_old_videos():
     """获取老旧视频"""
-    now_date = datetime.strftime(datetime.today(), '%Y%m%d')
+    now_date = datetime.strftime(datetime.today(), '%Y%m%d%H')
     log_.info(f"now_date = {now_date}")
     project = config_.OLD_VIDEOS_PROJECT
     table = config_.OLD_VIDEOS_TABLE
     records = get_data_from_odps(project=project, table=table, date=now_date)
-    data = [{'videoid': record['videoid']} for record in records]
-    data_df = pd.DataFrame(data=data)
-    print(data_df)
+    video_ids = [int(record['videoid']) for record in records]
+    log_.info(f'videos count = {len(video_ids)}')
+    if len(video_ids) > 0:
+        # 对视频状态进行过滤
+        filtered_videos = filter_video_status(list(video_ids))
+        log_.info('filter videos status finished, filtered_videos count={}'.format(len(filtered_videos)))
+        if not filtered_videos:
+            log_.info('视频状态不符合分发')
+            return None
+        # 上传数据到redis
+        key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}'
+        redis_helper = RedisHelper()
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(key_name):
+            redis_helper.del_keys(key_name)
+        # 写入redis
+        redis_helper.add_data_with_set(key_name=key_name, values=filtered_videos, expire_time=24 * 3600)
+        log_.info('data to redis finished!')
+    else:
+        log_.info(f'no data!')
 
 
 if __name__ == '__main__':

+ 7 - 0
old_video_recall_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/old_video_recall.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/old_video_recall.py
+fi

+ 32 - 0
videos_filter.py

@@ -508,6 +508,36 @@ def filter_rov_day():
     log_.info("rov_day pool filter end!")
 
 
+def filter_old_videos():
+    """过滤老视频数据"""
+    log_.info("old videos filter start ...")
+    redis_helper = RedisHelper()
+    # 获取当前日期
+    now_date = date.today().strftime('%Y%m%d')
+    log_.info(f'now_date = {now_date}.')
+    # 拼接key
+    key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}'
+    # 获取视频
+    data = redis_helper.get_data_from_set(key_name=key_name)
+    if data is None:
+        log_.info("data is None")
+        log_.info("old videos filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("old videos filter end!")
+        return
+    redis_helper.remove_value_from_set(key_name=key_name, values=filter_videos)
+    log_.info("old videos filter end!")
+
+
 def main():
     try:
         # ROV召回池视频过滤
@@ -537,6 +567,8 @@ def main():
         filter_rov_h()
         # 过滤小程序天级数据
         filter_rov_day()
+        # 过滤老视频数据
+        filter_old_videos()
     except Exception as e:
         log_.error(traceback.format_exc())
         send_msg_to_feishu(