Browse Source

config opt

liqian 3 years ago
parent
commit
559e361e44
17 changed files with 855 additions and 121 deletions
  1. 7 0
      bottom_task.sh
  2. 172 0
      bottom_videos.py
  3. 126 6
      config.py
  4. 31 21
      db_helper.py
  5. 7 1
      flow_pool_task.sh
  6. 1 1
      get_data.py
  7. 1 0
      log.py
  8. 59 48
      pool_predict.py
  9. 23 11
      rov_data_check.py
  10. 7 1
      rov_pool_task.sh
  11. 80 20
      rov_train.py
  12. 7 1
      rov_train_recall_pool_update.sh
  13. 44 0
      top_video_list.py
  14. 7 0
      top_video_task.sh
  15. 94 11
      utils.py
  16. 182 0
      videos_filter.py
  17. 7 0
      videos_filter_task.sh

+ 7 - 0
bottom_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/bottom_videos.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/bottom_videos.py
+fi

+ 172 - 0
bottom_videos.py

@@ -0,0 +1,172 @@
+# coding:utf-8
+import datetime
+import json
+import traceback
+import os
+import json
+
+from utils import execute_sql_from_odps, request_post, update_video_w_h_rate
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_, env = set_config()
+log_ = Log()
+
+
+def get_bottom_videos_test():
+    """获取测试环境兜底视频"""
+    try:
+        # 获取总播放量top1000的视频
+        sql = "SELECT  id " \
+              ",play_count_total " \
+              "FROM    videoods.wx_video_test " \
+              "WHERE   transcode_status = 3 " \
+              "AND     STATUS = 1 " \
+              "AND     recommend_status IN ( - 6, 1) " \
+              "ORDER BY play_count_total DESC " \
+              "LIMIT   2000" \
+              ";"
+
+        records = execute_sql_from_odps(project='videoods', sql=sql)
+
+        # 视频按照总播放量写入redis
+        video_id_list = []
+        videos = {}
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['id'])
+                video_id_list.append(video_id)
+                videos[video_id] = record['play_count_total']
+
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def get_bottom_videos_pro(now_date):
+    """获取生产环境兜底视频"""
+    try:
+        # 获取昨日播放量top1000的视频
+        delta_date = now_date - datetime.timedelta(days=1)
+
+        sql = "SELECT  video_playcount.videoid " \
+              ",video_playcount.play_count " \
+              "FROM    ( " \
+              "SELECT  videoid " \
+              ",COUNT(*) play_count " \
+              "FROM    loghubods.video_action_log_applet " \
+              "WHERE   dt = {} " \
+              "AND     business = 'videoPlay' " \
+              "GROUP BY videoid " \
+              ") video_playcount INNER " \
+              "JOIN    ( " \
+              "SELECT  DISTINCT videoid " \
+              "FROM    videoods.dim_video " \
+              "WHERE   video_edit = '通过' " \
+              "AND     video_data_stat = '有效' " \
+              "AND     video_recommend IN ( '待推荐', '普通推荐') " \
+              "AND     charge = '免费' " \
+              "AND     is_pwd = '未加密' " \
+              ") video_status " \
+              "ON      video_playcount.videoid = video_status.videoid " \
+              "ORDER BY video_playcount.play_count DESC " \
+              "LIMIT   2000 " \
+              ";".format(delta_date.strftime('%Y%m%d'))
+
+        records = execute_sql_from_odps(project='loghubods', sql=sql)
+
+        # 视频按照昨日播放量写入redis
+        video_id_list = []
+        videos = {}
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                video_id_list.append(video_id)
+                videos[video_id] = record['play_count']
+
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def update_bottom_videos():
+    """更新兜底视频"""
+    try:
+        # 获取对应环境的兜底视频
+        now_date = datetime.datetime.today()
+        if env in ['dev', 'test']:
+            video_id_list, videos = get_bottom_videos_test()
+        elif env in ['pre', 'pro']:
+            video_id_list, videos = get_bottom_videos_pro(now_date=now_date)
+        else:
+            log_.error('env error')
+            return
+
+        # redis数据更新
+        redis_helper = RedisHelper()
+        redis_helper.del_keys(key_name=config_.BOTTOM_KEY_NAME)
+        redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)
+        # 与原有兜底视频排序,保留top1000
+        redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1)
+        # 移除bottom key的过期时间,将其转换为永久状态
+        redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME)
+        log_.info('{} update bottom videos success!, count = {}'.format(now_date, config_.BOTTOM_NUM))
+
+        # 更新视频的宽高比数据
+        video_ids = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
+        if video_ids:
+            update_video_w_h_rate(video_ids=video_ids,
+                                  key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'])
+            log_.info('update video w_h_rate to redis finished!')
+
+        # 获取今日兜底视频的json,并存入redis
+        video_json_list = []
+        for i in range(0, len(video_id_list)//10):
+            video_json = get_video_info_json(video_ids=video_id_list[i*10:(i+1)*10])
+            if video_json is not None:
+                video_json_list.extend(video_json)
+            if len(video_json_list) >= 1000:
+                break
+        # 写入redis,先删除key,再重新写入
+        redis_helper.del_keys(config_.BOTTOM_JSON_KEY_NAME)
+        redis_helper.add_data_with_set(key_name=config_.BOTTOM_JSON_KEY_NAME, values=video_json_list[:1000])
+        # 移除过期时间,将其转换为永久状态
+        redis_helper.persist_key(key_name=config_.BOTTOM_JSON_KEY_NAME)
+
+        log_.info('{} update bottom videos info json success!, count = {}'.format(now_date,
+                                                                                  len(video_json_list[:1000])))
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def get_video_info_json(video_ids):
+    """
+    获取视频对应json
+    :param video_ids: type-list [int, int, ...]
+    :return: json_data_list
+    """
+    request_data = {
+        "appType": 4,
+        "platform": "android",
+        "versionCode": 295,
+        "pageSize": 10,
+        "machineCode": "weixin_openid_otjoB5WWWmkRjpMzkV5ltZ3osg3A",
+        "uid": 6281917,
+        "videoIds": video_ids
+    }
+    res = request_post(request_url=config_.BOTTOM_JSON_URL, request_data=request_data)
+    if res is None:
+        return None
+    if res['code'] != 0:
+        log_.info('获取视频json失败!')
+        return None
+    json_data_list = [json.dumps(item) for item in res['data']]
+    return json_data_list
+
+
+if __name__ == '__main__':
+    update_bottom_videos()

+ 126 - 6
config.py

@@ -1,10 +1,19 @@
+# coding:utf-8
+import os
+from log import Log
+log_ = Log()
+
+
 class BaseConfig(object):
     # 产品标识
     APP_TYPE = {
         'VLOG': 0,  # vlog
         'LOVE_LIVE': 4,  # 票圈视频
         'LONG_VIDEO': 5,  # 内容精选
-        'SHORT_VIDEO': 6
+        'SHORT_VIDEO': 6,  # 票圈短视频
+        'WAN_NENG_VIDEO': 17,  # 万能影视屋
+        'LAO_HAO_KAN_VIDEO': 18,  # 老好看视频
+        'ZUI_JING_QI': 19,  # 票圈最惊奇
     }
     # 数据存放路径
     DATA_DIR_PATH = './data'
@@ -37,12 +46,40 @@ class BaseConfig(object):
 
     # 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
     RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
+
     # 流量池离线模型结果存放 redis key前缀,完整格式 com.weiqu.video.flowpool.hot.item.score.{appType}
     FLOWPOOL_KEY_NAME_PREFIX = 'com.weiqu.video.flowpool.hot.item.score.'
 
+    # 兜底视频redis存储key
+    BOTTOM_KEY_NAME = 'com.weiqu.video.bottom'
+    # 兜底视频数量
+    BOTTOM_NUM = 1000
+    # 首页兜底视频json存储 redis-key
+    BOTTOM_JSON_KEY_NAME = 'com.weiqu.video.homepage.bottom.info.json.item'
+
+    # 修改ROV的视频 redis key
+    UPDATE_ROV_KEY_NAME = 'com.weiqu.video.update.rov.item.score'
+
+    # 生效中的置顶视频列表 redis key
+    TOP_VIDEO_LIST_KEY_NAME = 'com.weiqu.video.top.item.score.area'
+
+    # rovScore公差
+    ROV_SCORE_D = 0.001
+
+    # width : height > 1 的视频列表 redis key, zset存储,value为videoId,score为w_h_rate
+    W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME = {
+        'rov_recall': 'com.weiqu.video.rov.w.h.rate.1.item',  # rov召回池视频
+        'bottom_last': 'com.weiqu.video.bottom.last.w.h.rate.1.item'  # 二次兜底视频
+    }
+
 
 class DevelopmentConfig(BaseConfig):
     """开发环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "开发环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -59,6 +96,16 @@ class DevelopmentConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 测试环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
@@ -68,6 +115,10 @@ class DevelopmentConfig(BaseConfig):
     GET_REMAIN_VIEW_COUNT_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/remainViewCount'
     # 计算完ROV通知后端接口地址
     NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/updateRovScore'
+    # 获取置顶视频列表接口地址
+    TOP_VIDEO_LIST_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/topVideoList'
+    # 获取首页兜底视频json接口地址
+    BOTTOM_JSON_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/video/distribute/structure/video/list'
 
     # logs 上传oss 目标Bucket指定目录
     OSS_FOLDER_LOGS = 'rov-offline/dev/logs/'
@@ -77,6 +128,11 @@ class DevelopmentConfig(BaseConfig):
 
 class TestConfig(BaseConfig):
     """测试环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "测试环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -93,6 +149,16 @@ class TestConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 测试环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
@@ -102,6 +168,10 @@ class TestConfig(BaseConfig):
     GET_REMAIN_VIEW_COUNT_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/remainViewCount'
     # 计算完ROV通知后端接口地址
     NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/updateRovScore'
+    # 获取置顶视频列表接口地址
+    TOP_VIDEO_LIST_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/topVideoList'
+    # 获取首页兜底视频json接口地址
+    BOTTOM_JSON_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/video/distribute/structure/video/list'
 
     # logs 上传oss 目标Bucket指定目录
     OSS_FOLDER_LOGS = 'rov-offline/test/logs/'
@@ -111,6 +181,11 @@ class TestConfig(BaseConfig):
 
 class PreProductionConfig(BaseConfig):
     """预发布环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "预发布环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -127,6 +202,16 @@ class PreProductionConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 生产环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
@@ -136,6 +221,10 @@ class PreProductionConfig(BaseConfig):
     GET_REMAIN_VIEW_COUNT_URL = 'http://preapi-internal.piaoquantv.com/flowpool/video/remainViewCount'
     # 计算完ROV通知后端接口地址
     NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://videopre-internal.piaoquantv.com/longvideoapi/openapi/recommend/updateRovScore'
+    # 获取置顶视频列表接口地址
+    TOP_VIDEO_LIST_URL = 'http://speedpre.wx.com/longvideoapi/openapi/recommend/topVideoList'
+    # 获取首页兜底视频json接口地址
+    BOTTOM_JSON_URL = 'http://speedpre.wx.com/longvideoapi/openapi/video/distribute/structure/video/list'
 
     # logs 上传oss 目标Bucket指定目录
     OSS_FOLDER_LOGS = 'rov-offline/pre/logs/'
@@ -145,6 +234,11 @@ class PreProductionConfig(BaseConfig):
 
 class ProductionConfig(BaseConfig):
     """生产环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "生产环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # 线上环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -161,6 +255,16 @@ class ProductionConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 生产环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
@@ -169,7 +273,11 @@ class ProductionConfig(BaseConfig):
     # 获取视频在流量池中的剩余可分发数接口地址
     GET_REMAIN_VIEW_COUNT_URL = 'http://api-internal.piaoquantv.com/flowpool/video/remainViewCount'
     # 计算完ROV通知后端接口地址
-    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/recommend/updateRovScore'
+    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/updateRovScore'
+    # 获取置顶视频列表接口地址
+    TOP_VIDEO_LIST_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/topVideoList'
+    # 获取首页兜底视频json接口地址
+    BOTTOM_JSON_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/video/distribute/structure/video/list'
 
     # logs 上传oss 目标Bucket指定目录
     OSS_FOLDER_LOGS = 'rov-offline/pro/logs/'
@@ -178,7 +286,19 @@ class ProductionConfig(BaseConfig):
 
 
 def set_config():
-    # return DevelopmentConfig()
-    # return TestConfig()
-    return PreProductionConfig()
-    # return ProductionConfig()
+    # 获取环境变量 ROV_OFFLINE_ENV
+    env = os.environ.get('ROV_OFFLINE_ENV')
+    if env is None:
+        log_.error('ENV ERROR: is None!')
+        return
+    if env == 'dev':
+        return DevelopmentConfig(), env
+    elif env == 'test':
+        return TestConfig(), env
+    elif env == 'pre':
+        return PreProductionConfig(), env
+    elif env == 'pro':
+        return ProductionConfig(), env
+    else:
+        log_.error('ENV ERROR: is {}'.format(env))
+        return

+ 31 - 21
db_helper.py

@@ -1,10 +1,11 @@
+# coding:utf-8
 import redis
 import psycopg2
 import pymysql
 from config import set_config
 from log import Log
 
-config_ = set_config()
+config_, _ = set_config()
 log = Log()
 
 
@@ -100,10 +101,11 @@ class RedisHelper(object):
         if not conn.exists(key_name):
             return None
         data = conn.zrange(key_name, start, end, desc, with_scores)
-        if with_scores:
-            return data
-        else:
-            return [eval(value) for value in data]
+        return data
+        # if with_scores:
+        #     return data
+        # else:
+        #     return [eval(value) for value in data]
 
     def get_score_with_value(self, key_name, value):
         """
@@ -139,7 +141,18 @@ class RedisHelper(object):
         :return: None
         """
         conn = self.connect()
-        conn.zrem(key_name, value)
+        conn.zrem(key_name, *value)
+
+    def remove_by_rank_from_zset(self, key_name, start, stop):
+        """
+        移除有序集中,指定排名(rank)区间内的所有成员
+        :param key_name: key
+        :param start: 开始位
+        :param stop: 结束位
+        :return: None
+        """
+        conn = self.connect()
+        conn.zremrangebyrank(name=key_name, min=start, max=stop)
 
     def get_index_with_data(self, key_name, value):
         """
@@ -197,6 +210,15 @@ class RedisHelper(object):
         conn = self.connect()
         conn.srem(key_name, *values)
 
+    def persist_key(self, key_name):
+        """
+        移除key的过期时间,将其转换为永久状态
+        :param key_name: key
+        :return:
+        """
+        conn = self.connect()
+        conn.persist(key_name)
+
 
 class HologresHelper(object):
     def __init__(self):
@@ -220,16 +242,11 @@ class HologresHelper(object):
 
 
 class MysqlHelper(object):
-    def __init__(self, mysql_info):
+    def __init__(self):
         """
         初始化mysql连接信息
-        :param mysql_info: mysql连接信息, 格式:dict, {'host': '', 'port': '', 'user':'', 'password': '', 'db': ''}
         """
-        self.host = mysql_info['host']
-        self.port = mysql_info['port']
-        self.user = mysql_info['user']
-        self.password = mysql_info['password']
-        self.db = mysql_info['db']
+        self.mysql_info = config_.MYSQL_INFO
 
     def get_data(self, sql):
         """
@@ -238,14 +255,7 @@ class MysqlHelper(object):
         :return: data
         """
         # 连接数据库
-        conn = pymysql.connect(
-            host=self.host,
-            port=self.port,
-            user=self.user,
-            password=self.password,
-            db=self.db,
-            charset='utf8'
-        )
+        conn = pymysql.connect(**self.mysql_info)
         # 创建游标
         cursor = conn.cursor()
         try:

+ 7 - 1
flow_pool_task.sh

@@ -1 +1,7 @@
-python /data/rov-offline/pool_predict.py
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+fi

+ 1 - 1
get_data.py

@@ -7,7 +7,7 @@ from config import set_config
 from utils import get_data_from_odps, write_to_pickle
 from log import Log
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 
 

+ 1 - 0
log.py

@@ -1,3 +1,4 @@
+# coding:utf-8
 import os
 import logging
 import time

+ 59 - 48
pool_predict.py

@@ -1,12 +1,15 @@
+import random
 import time
 import os
+import traceback
+import random
 
 from config import set_config
-from utils import request_post, filter_video_status
+from utils import request_post, filter_video_status, send_msg_to_feishu
 from log import Log
 from db_helper import RedisHelper
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 
 
@@ -60,7 +63,8 @@ def get_videos_remain_view_count(app_type, videos_info):
 
 
 def get_score(video_ids):
-    return [1] * len(video_ids)
+    # 以[0, 100]之间的随机浮点数作为score
+    return [random.uniform(0, 100) for _ in range(len(video_ids))]
 
 
 def predict(app_type):
@@ -69,54 +73,61 @@ def predict(app_type):
     :param app_type: 产品标识 type-int
     :return: None
     """
-    # 从流量池获取数据
-    videos = get_videos_from_flow_pool(app_type=app_type)
-    if len(videos) <= 0:
-        log_.info('流量池中无需分发的视频')
-        return None
-    # video_id 与 flow_pool 进行mapping
-    video_ids = set()
-    log_.info('流量池中视频数:{}'.format(len(videos)))
-    mapping = {}
-    for video in videos:
-        video_id = video['videoId']
-        video_ids.add(video_id)
-        if video_id in mapping:
-            mapping[video_id].append(video['flowPool'])
-        else:
-            mapping[video_id] = [video['flowPool']]
-
-    # 对视频状态进行过滤
-    filtered_videos = filter_video_status(list(video_ids))
-    log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
-    if not filtered_videos:
-        log_.info('流量池中视频状态不符合分发')
-        return None
-    # 预测
-    video_score = get_score(filtered_videos)
-    log_.info('predict finished!')
-    # 上传数据到redis
-    redis_data = {}
-    for i in range(len(video_score)):
-        video_id = filtered_videos[i]
-        score = video_score[i]
-        for flow_pool in mapping.get(video_id):
-            value = '{}-{}'.format(video_id, flow_pool)
-            redis_data[value] = score
-    key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
-    redis_helper = RedisHelper()
-    # 如果key已存在,删除key
-    if redis_helper.key_exists(key_name):
-        redis_helper.del_keys(key_name)
-    # 写入redis
-    redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
-    log_.info('data to redis finished!')
+    try:
+        # 从流量池获取数据
+        videos = get_videos_from_flow_pool(app_type=app_type)
+        if len(videos) <= 0:
+            log_.info('流量池中无需分发的视频')
+            return None
+        # video_id 与 flow_pool 进行mapping
+        video_ids = set()
+        log_.info('流量池中视频数:{}'.format(len(videos)))
+        mapping = {}
+        for video in videos:
+            video_id = video['videoId']
+            video_ids.add(video_id)
+            if video_id in mapping:
+                mapping[video_id].append(video['flowPool'])
+            else:
+                mapping[video_id] = [video['flowPool']]
+
+        # 对视频状态进行过滤
+        filtered_videos = filter_video_status(list(video_ids))
+        log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
+        if not filtered_videos:
+            log_.info('流量池中视频状态不符合分发')
+            return None
+        # 预测
+        video_score = get_score(filtered_videos)
+        log_.info('predict finished!')
+        # 上传数据到redis
+        redis_data = {}
+        for i in range(len(video_score)):
+            video_id = filtered_videos[i]
+            score = video_score[i]
+            for flow_pool in mapping.get(video_id):
+                value = '{}-{}'.format(video_id, flow_pool)
+                redis_data[value] = score
+        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+        redis_helper = RedisHelper()
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(key_name):
+            redis_helper.del_keys(key_name)
+        # 写入redis
+        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+        log_.info('data to redis finished!')
+    except Exception as e:
+        log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
+            app_type, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - 流量池更新失败, appType: {}, exception: {}'.format(
+            config_.ENV_TEXT, app_type, e))
 
 
 if __name__ == '__main__':
     # res = get_videos_from_pool(app_type=0)
     # res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
     # print(res)
+
     log_.info('flow pool predict start...')
     for app_name, app_type in config_.APP_TYPE.items():
         log_.info('{} predict start...'.format(app_name))
@@ -124,6 +135,6 @@ if __name__ == '__main__':
         log_.info('{} predict end...'.format(app_name))
     log_.info('flow pool predict end...')
     # 将日志上传到oss
-    log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
-                                                       config_.OSS_FOLDER_LOGS + 'flow_pool/')
-    os.system(log_cmd)
+    # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
+    #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')
+    # os.system(log_cmd)

+ 23 - 11
rov_data_check.py

@@ -1,27 +1,34 @@
 import datetime
 import os
+import traceback
 
 from odps import ODPS
 from datetime import datetime as dt
 from threading import Timer
 from config import set_config
 from log import Log
+from utils import send_msg_to_feishu
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
 def rov_train_recall_pool_update():
     # 训练数据和预测数据都准备好时,更新模型,预测
-    os.system('sh /data/rov-offline/rov_train_recall_pool_update.sh')
-    # 将日志上传到oss
-    log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
-                                                       config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
-    os.system(log_cmd)
-    # 将data上传到oss
-    data_cmd = "ossutil cp -r -f {} oss://{}/{}".format("/data/rov-offline/data", config_.BUCKET_NAME,
-                                                        config_.OSS_FOLDER_DATA)
-    os.system(data_cmd)
+    os.system('cd {} && sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH, config_.PROJECT_PATH))
+    # # 将日志上传到oss
+    # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
+    #                                                    config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
+    # os.system(log_cmd)
+
+    # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
+    if env == 'pro':
+        data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
+                                                                    config_.BUCKET_NAME,
+                                                                    config_.OSS_FOLDER_DATA,
+                                                                    datetime.datetime.today().strftime('%Y%m%d'),
+                                                                    'predict.csv')
+        os.system(data_cmd)
 
 
 def data_check(project, table, date):
@@ -69,4 +76,9 @@ def timer_check():
 
 
 if __name__ == '__main__':
-    timer_check()
+    try:
+        timer_check()
+    except Exception as e:
+        log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e))
+

+ 7 - 1
rov_pool_task.sh

@@ -1 +1,7 @@
-python /data/rov-offline/rov_data_check.py
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rov_data_check.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rov_data_check.py
+fi

+ 80 - 20
rov_train.py

@@ -9,11 +9,12 @@ from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
 
 from config import set_config
-from utils import read_from_pickle, write_to_pickle, data_normalization, request_post, filter_video_status
+from utils import read_from_pickle, write_to_pickle, data_normalization, \
+    request_post, filter_video_status, update_video_w_h_rate
 from log import Log
 from db_helper import RedisHelper, MysqlHelper
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
@@ -124,7 +125,7 @@ def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PA
     :param sort_columns: 指定排序列名列名,type-list, 默认为None
     :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
     :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
-    :param data: 数据
+    :param data: 数据, type-dict
     :return: None
     """
     if not os.path.exists(filepath):
@@ -136,6 +137,26 @@ def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PA
     df.to_csv(file, index=False)
 
 
+def pack_list_result_to_csv(filename, data, columns=None, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True):
+    """
+    打包数据并存入csv, 数据为字典列表
+    :param filename: csv文件名
+    :param data: 数据,type-list [{}, {},...]
+    :param columns: 列名顺序
+    :param sort_columns: 指定排序列名列名,type-list, 默认为None
+    :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
+    :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
+    :return: None
+    """
+    if not os.path.exists(filepath):
+        os.makedirs(filepath)
+    file = os.path.join(filepath, filename)
+    df = pd.DataFrame(data=data)
+    if sort_columns:
+        df = df.sort_values(by=sort_columns, ascending=ascending)
+    df.to_csv(file, index=False, columns=columns)
+
+
 def predict():
     """预测"""
     # 读取预测数据并进行清洗
@@ -146,48 +167,74 @@ def predict():
     # 预测
     y_ = model.predict(x)
     log_.info('predict finished!')
+
     # 将结果进行归一化到[0, 100]
     normal_y_ = data_normalization(list(y_))
     log_.info('normalization finished!')
+
+    # 按照normal_y_降序排序
+    predict_data = []
+    for i, video_id in enumerate(video_ids):
+        data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i], 'y': y[i]}
+        predict_data.append(data)
+    predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True)
+
+    # 按照排序,从100以固定差值做等差递减,以该值作为rovScore
+    predict_result = []
+    redis_data = {}
+    json_data = []
+    video_id_list = []
+    for j, item in enumerate(predict_data_sorted):
+        video_id = int(item['video_id'])
+        rov_score = 100 - j * config_.ROV_SCORE_D
+        item['rov_score'] = rov_score
+        predict_result.append(item)
+        redis_data[video_id] = rov_score
+        json_data.append({'videoId': video_id, 'rovScore': rov_score})
+        video_id_list.append(video_id)
+
     # 打包预测结果存入csv
-    predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids}
     predict_result_filename = 'predict.csv'
-    pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data)
+    pack_list_result_to_csv(filename=predict_result_filename,
+                            data=predict_result,
+                            columns=['video_id', 'rov_score', 'normal_y_', 'y_', 'y'],
+                            sort_columns=['rov_score'],
+                            ascending=False)
+
     # 上传redis
-    redis_data = {}
-    json_data = []
-    for i in range(len(video_ids)):
-        redis_data[video_ids[i]] = normal_y_[i]
-        json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]})
     key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
     redis_helper = RedisHelper()
     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
     log_.info('data to redis finished!')
+
+    # 清空修改ROV的视频数据
+    redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
+
     # 通知后端更新数据
+    log_.info('json_data count = {}'.format(len(json_data)))
     result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
     if result['code'] == 0:
         log_.info('notify backend success!')
     else:
         log_.error('notify backend fail!')
 
+    # 更新视频的宽高比数据
+    if video_id_list:
+        update_video_w_h_rate(video_ids=video_id_list,
+                              key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
+        log_.info('update video w_h_rate to redis finished!')
+
 
 def predict_test():
     """测试环境数据生成"""
     # 获取测试环境中最近发布的40000条视频
-    mysql_info = {
-        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
-        'port': 3306,
-        'user': 'wx2016_longvideo',
-        'password': 'wx2016_longvideoP@assword1234',
-        'db': 'longvideo'
-    }
     sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;"
-    mysql_helper = MysqlHelper(mysql_info=mysql_info)
+    mysql_helper = MysqlHelper()
     data = mysql_helper.get_data(sql=sql)
     video_ids = [video[0] for video in data]
     # 视频状态过滤
     filtered_videos = filter_video_status(video_ids)
-    log_.info('filtered_videos nums={}'.format(len(filtered_videos)))
+    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
     # 随机生成 0-100 数作为分数
     redis_data = {}
     json_data = []
@@ -195,17 +242,25 @@ def predict_test():
         score = random.uniform(0, 100)
         redis_data[video_id] = score
         json_data.append({'videoId': video_id, 'rovScore': score})
+    log_.info('json_data count = {}'.format(len(json_data)))
     # 上传Redis
     redis_helper = RedisHelper()
     key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
     log_.info('test data to redis finished!')
+    # 清空修改ROV的视频数据
+    redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
     # 通知后端更新数据
     result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
     if result['code'] == 0:
         log_.info('notify backend success!')
     else:
         log_.error('notify backend fail!')
+    # 更新视频的宽高比数据
+    if filtered_videos:
+        update_video_w_h_rate(video_ids=filtered_videos,
+                              key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
+        log_.info('update video w_h_rate to redis finished!')
 
 
 if __name__ == '__main__':
@@ -220,6 +275,11 @@ if __name__ == '__main__':
 
     log_.info('rov model predict start...')
     predict_start = time.time()
-    predict()
+    if env in ['dev', 'test']:
+        predict_test()
+    elif env in ['pre', 'pro']:
+        predict()
+    else:
+        log_.error('env error')
     predict_end = time.time()
     log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))

+ 7 - 1
rov_train_recall_pool_update.sh

@@ -1 +1,7 @@
-python /data/rov-offline/get_data.py && python /data/rov-offline/rov_train.py
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    /root/anaconda3/bin/python /data2/rov-offline/get_data.py && /root/anaconda3/bin/python /data2/rov-offline/rov_train.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    /root/anaconda3/bin/python /data/rov-offline/get_data.py && /root/anaconda3/bin/python /data/rov-offline/rov_train.py
+fi

+ 44 - 0
top_video_list.py

@@ -0,0 +1,44 @@
+import traceback
+from utils import request_post, filter_video_status
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_, _ = set_config()
+log_ = Log()
+
+
+def get_top_video_list():
+    """接口获取生效中的置顶视频列表,并存入redis"""
+    try:
+        result = request_post(request_url=config_.TOP_VIDEO_LIST_URL, request_data={})
+        if result is None:
+            return
+        if result['code'] != 0:
+            log_.info('获取置顶视频失败!')
+            return
+
+        data = result['data']
+        if len(data) == 0:
+            log_.info('无置顶视频!')
+            return
+
+        # 视频状态过滤
+        video_ids = [item['videoId'] for item in data]
+        log_.info('video_ids = {}'.format(video_ids))
+        filter_videos = filter_video_status(video_ids=video_ids)
+        log_.info('filter_videos = {}'.format(filter_videos))
+        value = [item for item in data if item['videoId'] in filter_videos]
+        log_.info('value = {}'.format(value))
+        # 写入redis
+        redis_helper = RedisHelper()
+        redis_helper.set_data_to_redis(key_name=config_.TOP_VIDEO_LIST_KEY_NAME, value=str(value), expire_time=5 * 60)
+        log_.info('置顶视频更新成功!')
+
+    except Exception as e:
+        log_.error('置顶视频更新失败!')
+        log_.error(traceback.format_exc())
+
+
+if __name__ == '__main__':
+    get_top_video_list()

+ 7 - 0
top_video_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/top_video_list.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/top_video_list.py
+fi

+ 94 - 11
utils.py

@@ -1,13 +1,33 @@
+# coding:utf-8
 import pickle
 import os
 import requests
 import json
+import traceback
 
 from odps import ODPS
 from config import set_config
-from db_helper import HologresHelper
+from db_helper import HologresHelper, MysqlHelper, RedisHelper
+from log import Log
 
-config_ = set_config()
+config_, env = set_config()
+log_ = Log()
+
+
+def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    odps = ODPS(
+        access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
+        secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
+        project=project,
+        endpoint='http://service.cn.maxcompute.aliyun.com/api',
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
+    )
+    records = odps.execute_sql(sql=sql)
+    return records
 
 
 def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
@@ -67,6 +87,23 @@ def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
     return data
 
 
+def send_msg_to_feishu(msg_text):
+    """发送消息到飞书"""
+    # webhook地址
+    webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
+    # 自定义关键词key_word
+    key_word = '服务报警'
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)
+
+
 def request_post(request_url, request_data):
     """
     post 请求 HTTP接口
@@ -74,10 +111,17 @@ def request_post(request_url, request_data):
     :param request_data: 请求参数
     :return: res_data json格式
     """
-    response = requests.post(url=request_url, json=request_data)
-    if response.status_code == 200:
-        res_data = json.loads(response.text)
-        return res_data
+    try:
+        response = requests.post(url=request_url, json=request_data)
+        if response.status_code == 200:
+            res_data = json.loads(response.text)
+            return res_data
+        else:
+            return None
+    except Exception as e:
+        log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e))
+        return None
 
 
 def data_normalization(data):
@@ -97,7 +141,7 @@ def filter_video_status(video_ids):
     对视频状态进行过滤
     :param video_ids: 视频id列表 type-list
     :return: filtered_videos
-        """
+    """
     if len(video_ids) == 1:
         sql = "set hg_experimental_enable_shard_pruning=off; " \
               "SELECT video_id " \
@@ -106,7 +150,7 @@ def filter_video_status(video_ids):
               "AND applet_rec_status IN (1, -6) " \
               "AND open_status = 1 " \
               "AND payment_status = 0 " \
-              "AND encryption_status IS NULL " \
+              "AND encryption_status != 5 " \
               "AND transcoding_status = 3 " \
               "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
     else:
@@ -117,17 +161,56 @@ def filter_video_status(video_ids):
               "AND applet_rec_status IN (1, -6) " \
               "AND open_status = 1 " \
               "AND payment_status = 0 " \
-              "AND encryption_status IS NULL " \
+              "AND encryption_status != 5 " \
               "AND transcoding_status = 3 " \
               "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
 
     hologres_helper = HologresHelper()
     data = hologres_helper.get_data(sql=sql)
-    filtered_videos = [temp[0] for temp in data]
+    filtered_videos = [int(temp[0]) for temp in data]
     return filtered_videos
 
 
+def update_video_w_h_rate(video_ids, key_name):
+    """
+    获取横屏视频的宽高比,并存入redis中 (width/height>1)
+    :param video_ids: videoId列表 type-list
+    :param key_name: redis key
+    :return: None
+    """
+    # 获取数据
+    if len(video_ids) == 1:
+        sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
+    else:
+        sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
+
+    mysql_helper = MysqlHelper()
+    data = mysql_helper.get_data(sql=sql)
+
+    # 更新到redis
+    info_data = {}
+    for video_id, width, height, rotate in data:
+        if int(width) == 0 or int(height) == 0:
+            continue
+        # rotate 字段值为 90或270时,width和height的值相反
+        if int(rotate) in (90, 270):
+            w_h_rate = int(height) / int(width)
+        else:
+            w_h_rate = int(width) / int(height)
+        if w_h_rate > 1:
+            info_data[int(video_id)] = w_h_rate
+    redis_helper = RedisHelper()
+    # 删除旧数据
+    redis_helper.del_keys(key_name=key_name)
+    # 写入新数据
+    if len(info_data) > 0:
+        redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
+
+
 if __name__ == '__main__':
     # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
     # data_normalization(data_test)
-    request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
+    # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
+    video_ids = [110, 112, 113, 115, 116, 117, 8289883]
+    update_video_w_h_rate(video_ids=video_ids, key_name='')
+

+ 182 - 0
videos_filter.py

@@ -0,0 +1,182 @@
+import time
+import traceback
+from datetime import date, timedelta
+
+from utils import filter_video_status, send_msg_to_feishu
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_, env = set_config()
+log_ = Log()
+
+
+def filter_rov_pool():
+    """ROV召回池视频过滤"""
+    log_.info("rov recall pool filter start ...")
+    # 拼接redis-key
+    key_name, _ = get_pool_redis_key(pool_type='rov')
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("rov recall pool filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("rov recall pool filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
+    log_.info("rov recall pool filter end!")
+
+
+def filter_flow_pool():
+    """流量池视频过滤"""
+    log_.info("flow pool filter start ...")
+    for _, app_type in config_.APP_TYPE.items():
+        log_.info('app_type {} videos filter start...'.format(app_type))
+        # 拼接redis-key
+        key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
+        # 获取视频
+        redis_helper = RedisHelper()
+        data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+        if data is None:
+            log_.info("data is None")
+            log_.info("app_type {} videos filter end!".format(app_type))
+            continue
+        # videoId与flowPool做mapping
+        video_ids = []
+        mapping = {}
+        for video in data:
+            video_id, flow_pool = video.split('-')
+            video_id = int(video_id)
+            if video_id not in video_ids:
+                video_ids.append(video_id)
+                mapping[video_id] = [flow_pool]
+            else:
+                mapping[video_id].append(flow_pool)
+        # 过滤
+        if len(video_ids) == 0:
+            log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
+            log_.info("app_type {} videos filter end!".format(app_type))
+            continue
+        filtered_result = filter_video_status(video_ids=video_ids)
+        # 求差集,获取需要过滤掉的视频,并从redis中移除
+        filter_videos = set(video_ids) - set(filtered_result)
+        log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
+            len(data), len(video_ids), len(filtered_result), len(filter_videos)))
+        # 移除
+        if len(filter_videos) == 0:
+            log_.info("app_type {} videos filter end!".format(app_type))
+            continue
+        remove_videos = ['{}-{}'.format(video_id, flow_pool)
+                         for video_id in filter_videos
+                         for flow_pool in mapping[video_id]]
+        redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
+        log_.info("app_type {} videos filter end!".format(app_type))
+    log_.info("flow pool filter end!")
+
+
+def filter_bottom():
+    """兜底视频过滤"""
+    log_.info("bottom videos filter start ...")
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("bottom videos filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("bottom videos filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
+    log_.info("bottom videos filter end!")
+
+
+def filter_rov_updated():
+    """修改过ROV的视频过滤"""
+    log_.info("update rov videos filter start ...")
+    # 获取视频
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
+    if data is None:
+        log_.info("data is None")
+        log_.info("update rov videos filter end!")
+        return
+    # 过滤
+    video_ids = [int(video_id) for video_id in data]
+    filtered_result = filter_video_status(video_ids=video_ids)
+    # 求差集,获取需要过滤掉的视频,并从redis中移除
+    filter_videos = set(video_ids) - set(filtered_result)
+    log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                  len(filtered_result),
+                                                                                  len(filter_videos)))
+    if len(filter_videos) == 0:
+        log_.info("update rov videos filter end!")
+        return
+    redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
+    log_.info("update rov videos filter end!")
+
+
+def get_pool_redis_key(pool_type, app_type=None):
+    """
+    拼接key
+    :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
+    :param app_type: 产品标识
+    :return: key_name
+    """
+    redis_helper = RedisHelper()
+    if pool_type == 'rov':
+        # 判断热度列表是否更新,未更新则使用前一天的热度列表
+        key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
+        if redis_helper.key_exists(key_name):
+            redis_date = date.today().strftime('%Y%m%d')
+        else:
+            redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
+            key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
+        return key_name, redis_date
+
+    elif pool_type == 'flow':
+        # 流量池
+        return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+
+    else:
+        log_.error('pool type error')
+        return None, None
+
+
+def main():
+    try:
+        # ROV召回池视频过滤
+        filter_rov_pool()
+        # 流量池视频过滤
+        filter_flow_pool()
+        # 兜底视频过滤
+        filter_bottom()
+        # 修改过ROV的视频过滤
+        filter_rov_updated()
+    except Exception as e:
+        log_.error(traceback.format_exc())
+        send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
+        return
+
+
+if __name__ == '__main__':
+    main()

+ 7 - 0
videos_filter_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/videos_filter.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/videos_filter.py
+fi