Browse Source

merge config-opt

liqian 3 years ago
parent
commit
13d0fd1361
19 changed files with 376 additions and 128 deletions
  1. 2 0
      .gitignore
  2. 7 1
      bottom_task.sh
  3. 78 13
      bottom_videos.py
  4. 63 5
      config.py
  5. 2 1
      db_helper.py
  6. 1 0
      env.conf
  7. 7 1
      flow_pool_task.sh
  8. 1 1
      get_data.py
  9. 1 0
      log.py
  10. 53 49
      pool_predict.py
  11. 19 7
      rov_data_check.py
  12. 7 1
      rov_pool_task.sh
  13. 77 18
      rov_train.py
  14. 7 1
      rov_train_recall_pool_update.sh
  15. 2 2
      top_video_list.py
  16. 7 1
      top_video_task.sh
  17. 33 22
      utils.py
  18. 2 4
      videos_filter.py
  19. 7 1
      videos_filter_task.sh

+ 2 - 0
.gitignore

@@ -63,3 +63,5 @@ target/
 data/
 data/
 *.csv
 *.csv
 *.pickle
 *.pickle
+
+env.conf

+ 7 - 1
bottom_task.sh

@@ -1 +1,7 @@
-cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/bottom_videos.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/bottom_videos.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/bottom_videos.py
+fi

+ 78 - 13
bottom_videos.py

@@ -1,4 +1,6 @@
+# coding:utf-8
 import datetime
 import datetime
+import json
 import traceback
 import traceback
 import os
 import os
 import json
 import json
@@ -8,15 +10,14 @@ from db_helper import RedisHelper
 from config import set_config
 from config import set_config
 from log import Log
 from log import Log
 
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
-def update_bottom_videos():
-    """更新兜底视频"""
+def get_bottom_videos_test():
+    """获取测试环境兜底视频"""
     try:
     try:
         # 获取总播放量top1000的视频
         # 获取总播放量top1000的视频
-        now_date = datetime.datetime.today()
         sql = "SELECT  id " \
         sql = "SELECT  id " \
               ",play_count_total " \
               ",play_count_total " \
               "FROM    videoods.wx_video_test " \
               "FROM    videoods.wx_video_test " \
@@ -38,13 +39,81 @@ def update_bottom_videos():
                 video_id_list.append(video_id)
                 video_id_list.append(video_id)
                 videos[video_id] = record['play_count_total']
                 videos[video_id] = record['play_count_total']
 
 
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def get_bottom_videos_pro(now_date):
+    """获取生产环境兜底视频"""
+    try:
+        # 获取昨日播放量top1000的视频
+        delta_date = now_date - datetime.timedelta(days=1)
+
+        sql = "SELECT  video_playcount.videoid " \
+              ",video_playcount.play_count " \
+              "FROM    ( " \
+              "SELECT  videoid " \
+              ",COUNT(*) play_count " \
+              "FROM    loghubods.video_action_log_applet " \
+              "WHERE   dt = {} " \
+              "AND     business = 'videoPlay' " \
+              "GROUP BY videoid " \
+              ") video_playcount INNER " \
+              "JOIN    ( " \
+              "SELECT  DISTINCT videoid " \
+              "FROM    videoods.dim_video " \
+              "WHERE   video_edit = '通过' " \
+              "AND     video_data_stat = '有效' " \
+              "AND     video_recommend IN ( '待推荐', '普通推荐') " \
+              "AND     charge = '免费' " \
+              "AND     is_pwd = '未加密' " \
+              ") video_status " \
+              "ON      video_playcount.videoid = video_status.videoid " \
+              "ORDER BY video_playcount.play_count DESC " \
+              "LIMIT   2000 " \
+              ";".format(delta_date.strftime('%Y%m%d'))
+
+        records = execute_sql_from_odps(project='loghubods', sql=sql)
+
+        # 视频按照昨日播放量写入redis
+        video_id_list = []
+        videos = {}
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                video_id_list.append(video_id)
+                videos[video_id] = record['play_count']
+
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def update_bottom_videos():
+    """更新兜底视频"""
+    try:
+        # 获取对应环境的兜底视频
+        now_date = datetime.datetime.today()
+        if env in ['dev', 'test']:
+            video_id_list, videos = get_bottom_videos_test()
+        elif env in ['pre', 'pro']:
+            video_id_list, videos = get_bottom_videos_pro(now_date=now_date)
+        else:
+            log_.error('env error')
+            return
+
+        # redis数据更新
         redis_helper = RedisHelper()
         redis_helper = RedisHelper()
+        redis_helper.del_keys(key_name=config_.BOTTOM_KEY_NAME)
         redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)
         redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)
         # 与原有兜底视频排序,保留top1000
         # 与原有兜底视频排序,保留top1000
         redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1)
         redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1)
         # 移除bottom key的过期时间,将其转换为永久状态
         # 移除bottom key的过期时间,将其转换为永久状态
         redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME)
         redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME)
-        log_.info('{} update bottom videos success! num = {}'.format(now_date, config_.BOTTOM_NUM))
+        log_.info('{} update bottom videos success!, count = {}'.format(now_date, config_.BOTTOM_NUM))
 
 
         # 更新视频的宽高比数据
         # 更新视频的宽高比数据
         video_ids = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
         video_ids = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
@@ -55,8 +124,8 @@ def update_bottom_videos():
 
 
         # 获取今日兜底视频的json,并存入redis
         # 获取今日兜底视频的json,并存入redis
         video_json_list = []
         video_json_list = []
-        for i in range(0, len(video_id_list) // 10):
-            video_json = get_video_info_json(video_ids=video_id_list[i * 10:(i + 1) * 10])
+        for i in range(0, len(video_id_list)//10):
+            video_json = get_video_info_json(video_ids=video_id_list[i*10:(i+1)*10])
             if video_json is not None:
             if video_json is not None:
                 video_json_list.extend(video_json)
                 video_json_list.extend(video_json)
             if len(video_json_list) >= 1000:
             if len(video_json_list) >= 1000:
@@ -67,8 +136,8 @@ def update_bottom_videos():
         # 移除过期时间,将其转换为永久状态
         # 移除过期时间,将其转换为永久状态
         redis_helper.persist_key(key_name=config_.BOTTOM_JSON_KEY_NAME)
         redis_helper.persist_key(key_name=config_.BOTTOM_JSON_KEY_NAME)
 
 
-        log_.info('{} update bottom videos info json success!, video nums = {}'.format(now_date,
-                                                                                       len(video_json_list[:1000])))
+        log_.info('{} update bottom videos info json success!, count = {}'.format(now_date,
+                                                                                  len(video_json_list[:1000])))
 
 
     except Exception as e:
     except Exception as e:
         log_.error(traceback.format_exc())
         log_.error(traceback.format_exc())
@@ -101,7 +170,3 @@ def get_video_info_json(video_ids):
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
     update_bottom_videos()
     update_bottom_videos()
-    # 将日志上传到oss
-    # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
-    #                                                    config_.OSS_FOLDER_LOGS + 'bottom_videos/')
-    # os.system(log_cmd)

+ 63 - 5
config.py

@@ -1,3 +1,8 @@
+# coding:utf-8
+from log import Log
+log_ = Log()
+
+
 class BaseConfig(object):
 class BaseConfig(object):
     # 产品标识
     # 产品标识
     APP_TYPE = {
     APP_TYPE = {
@@ -57,6 +62,9 @@ class BaseConfig(object):
     # 生效中的置顶视频列表 redis key
     # 生效中的置顶视频列表 redis key
     TOP_VIDEO_LIST_KEY_NAME = 'com.weiqu.video.top.item.score.area'
     TOP_VIDEO_LIST_KEY_NAME = 'com.weiqu.video.top.item.score.area'
 
 
+    # rovScore公差
+    ROV_SCORE_D = 0.001
+
     # width : height > 1 的视频列表 redis key, zset存储,value为videoId,score为w_h_rate
     # width : height > 1 的视频列表 redis key, zset存储,value为videoId,score为w_h_rate
     W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME = {
     W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME = {
         'rov_recall': 'com.weiqu.video.rov.w.h.rate.1.item',  # rov召回池视频
         'rov_recall': 'com.weiqu.video.rov.w.h.rate.1.item',  # rov召回池视频
@@ -66,6 +74,11 @@ class BaseConfig(object):
 
 
 class DevelopmentConfig(BaseConfig):
 class DevelopmentConfig(BaseConfig):
     """开发环境配置"""
     """开发环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "开发环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     # 测试环境redis地址
     REDIS_INFO = {
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -114,6 +127,11 @@ class DevelopmentConfig(BaseConfig):
 
 
 class TestConfig(BaseConfig):
 class TestConfig(BaseConfig):
     """测试环境配置"""
     """测试环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "测试环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     # 测试环境redis地址
     REDIS_INFO = {
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -162,6 +180,11 @@ class TestConfig(BaseConfig):
 
 
 class PreProductionConfig(BaseConfig):
 class PreProductionConfig(BaseConfig):
     """预发布环境配置"""
     """预发布环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "预发布环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # redis地址
     # redis地址
     REDIS_INFO = {
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -178,6 +201,16 @@ class PreProductionConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
     }
 
 
+    # 生产环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
 
@@ -200,6 +233,11 @@ class PreProductionConfig(BaseConfig):
 
 
 class ProductionConfig(BaseConfig):
 class ProductionConfig(BaseConfig):
     """生产环境配置"""
     """生产环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "生产环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # 线上环境redis地址
     # 线上环境redis地址
     REDIS_INFO = {
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -216,6 +254,16 @@ class ProductionConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
     }
 
 
+    # 生产环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
 
@@ -224,7 +272,7 @@ class ProductionConfig(BaseConfig):
     # 获取视频在流量池中的剩余可分发数接口地址
     # 获取视频在流量池中的剩余可分发数接口地址
     GET_REMAIN_VIEW_COUNT_URL = 'http://api-internal.piaoquantv.com/flowpool/video/remainViewCount'
     GET_REMAIN_VIEW_COUNT_URL = 'http://api-internal.piaoquantv.com/flowpool/video/remainViewCount'
     # 计算完ROV通知后端接口地址
     # 计算完ROV通知后端接口地址
-    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/recommend/updateRovScore'
+    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/updateRovScore'
     # 获取置顶视频列表接口地址
     # 获取置顶视频列表接口地址
     TOP_VIDEO_LIST_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/topVideoList'
     TOP_VIDEO_LIST_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/topVideoList'
     # 获取首页兜底视频json接口地址
     # 获取首页兜底视频json接口地址
@@ -237,7 +285,17 @@ class ProductionConfig(BaseConfig):
 
 
 
 
 def set_config():
 def set_config():
-    # return DevelopmentConfig()
-    return TestConfig()
-    # return PreProductionConfig()
-    # return ProductionConfig()
+    with open('env.conf', 'r') as env_f:
+        line = env_f.read()
+        env = line.split('=')[1].strip()
+    if env == 'dev':
+        return DevelopmentConfig(), env
+    elif env == 'test':
+        return TestConfig(), env
+    elif env == 'pre':
+        return PreProductionConfig(), env
+    elif env == 'pro':
+        return ProductionConfig(), env
+    else:
+        log_.error('env error')
+        return

+ 2 - 1
db_helper.py

@@ -1,10 +1,11 @@
+# coding:utf-8
 import redis
 import redis
 import psycopg2
 import psycopg2
 import pymysql
 import pymysql
 from config import set_config
 from config import set_config
 from log import Log
 from log import Log
 
 
-config_ = set_config()
+config_, _ = set_config()
 log = Log()
 log = Log()
 
 
 
 

+ 1 - 0
env.conf

@@ -0,0 +1 @@
+ENV=test

+ 7 - 1
flow_pool_task.sh

@@ -1 +1,7 @@
-cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+fi

+ 1 - 1
get_data.py

@@ -7,7 +7,7 @@ from config import set_config
 from utils import get_data_from_odps, write_to_pickle
 from utils import get_data_from_odps, write_to_pickle
 from log import Log
 from log import Log
 
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 

+ 1 - 0
log.py

@@ -1,3 +1,4 @@
+# coding:utf-8
 import os
 import os
 import logging
 import logging
 import time
 import time

+ 53 - 49
pool_predict.py

@@ -1,13 +1,15 @@
 import random
 import random
 import time
 import time
 import os
 import os
+import traceback
+import random
 
 
 from config import set_config
 from config import set_config
-from utils import request_post, filter_video_status
+from utils import request_post, filter_video_status, send_msg_to_feishu
 from log import Log
 from log import Log
 from db_helper import RedisHelper
 from db_helper import RedisHelper
 
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
@@ -71,48 +73,54 @@ def predict(app_type):
     :param app_type: 产品标识 type-int
     :param app_type: 产品标识 type-int
     :return: None
     :return: None
     """
     """
-    # 从流量池获取数据
-    videos = get_videos_from_flow_pool(app_type=app_type)
-    if len(videos) <= 0:
-        log_.info('流量池中无需分发的视频')
-        return None
-    # video_id 与 flow_pool 进行mapping
-    video_ids = set()
-    log_.info('流量池中视频数:{}'.format(len(videos)))
-    mapping = {}
-    for video in videos:
-        video_id = video['videoId']
-        video_ids.add(video_id)
-        if video_id in mapping:
-            mapping[video_id].append(video['flowPool'])
-        else:
-            mapping[video_id] = [video['flowPool']]
-
-    # 对视频状态进行过滤
-    filtered_videos = filter_video_status(list(video_ids))
-    log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
-    if not filtered_videos:
-        log_.info('流量池中视频状态不符合分发')
-        return None
-    # 预测
-    video_score = get_score(filtered_videos)
-    log_.info('predict finished!')
-    # 上传数据到redis
-    redis_data = {}
-    for i in range(len(video_score)):
-        video_id = filtered_videos[i]
-        score = video_score[i]
-        for flow_pool in mapping.get(video_id):
-            value = '{}-{}'.format(video_id, flow_pool)
-            redis_data[value] = score
-    key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
-    redis_helper = RedisHelper()
-    # 如果key已存在,删除key
-    if redis_helper.key_exists(key_name):
-        redis_helper.del_keys(key_name)
-    # 写入redis
-    redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
-    log_.info('data to redis finished!')
+    try:
+        # 从流量池获取数据
+        videos = get_videos_from_flow_pool(app_type=app_type)
+        if len(videos) <= 0:
+            log_.info('流量池中无需分发的视频')
+            return None
+        # video_id 与 flow_pool 进行mapping
+        video_ids = set()
+        log_.info('流量池中视频数:{}'.format(len(videos)))
+        mapping = {}
+        for video in videos:
+            video_id = video['videoId']
+            video_ids.add(video_id)
+            if video_id in mapping:
+                mapping[video_id].append(video['flowPool'])
+            else:
+                mapping[video_id] = [video['flowPool']]
+
+        # 对视频状态进行过滤
+        filtered_videos = filter_video_status(list(video_ids))
+        log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
+        if not filtered_videos:
+            log_.info('流量池中视频状态不符合分发')
+            return None
+        # 预测
+        video_score = get_score(filtered_videos)
+        log_.info('predict finished!')
+        # 上传数据到redis
+        redis_data = {}
+        for i in range(len(video_score)):
+            video_id = filtered_videos[i]
+            score = video_score[i]
+            for flow_pool in mapping.get(video_id):
+                value = '{}-{}'.format(video_id, flow_pool)
+                redis_data[value] = score
+        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+        redis_helper = RedisHelper()
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(key_name):
+            redis_helper.del_keys(key_name)
+        # 写入redis
+        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+        log_.info('data to redis finished!')
+    except Exception as e:
+        log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
+            app_type, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - 流量池更新失败, appType: {}, exception: {}'.format(
+            config_.ENV_TEXT, app_type, e))
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
@@ -126,11 +134,7 @@ if __name__ == '__main__':
         predict(app_type=app_type)
         predict(app_type=app_type)
         log_.info('{} predict end...'.format(app_name))
         log_.info('{} predict end...'.format(app_name))
     log_.info('flow pool predict end...')
     log_.info('flow pool predict end...')
-
-    # # 将日志上传到oss
+    # 将日志上传到oss
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')
     #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')
     # os.system(log_cmd)
     # os.system(log_cmd)
-
-    # res = get_score([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
-    # print(res)

+ 19 - 7
rov_data_check.py

@@ -1,27 +1,34 @@
 import datetime
 import datetime
 import os
 import os
+import traceback
 
 
 from odps import ODPS
 from odps import ODPS
 from datetime import datetime as dt
 from datetime import datetime as dt
 from threading import Timer
 from threading import Timer
 from config import set_config
 from config import set_config
 from log import Log
 from log import Log
+from utils import send_msg_to_feishu
 
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
 def rov_train_recall_pool_update():
 def rov_train_recall_pool_update():
     # 训练数据和预测数据都准备好时,更新模型,预测
     # 训练数据和预测数据都准备好时,更新模型,预测
-    os.system('sh /data2/rov-offline/rov_train_recall_pool_update.sh')
+    os.system('sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH))
     # # 将日志上传到oss
     # # 将日志上传到oss
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     #                                                    config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
     #                                                    config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
     # os.system(log_cmd)
     # os.system(log_cmd)
-    # # 将data上传到oss
-    # data_cmd = "ossutil cp -r -f {} oss://{}/{}".format("/data/rov-offline/data", config_.BUCKET_NAME,
-    #                                                     config_.OSS_FOLDER_DATA)
-    # os.system(data_cmd)
+
+    # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
+    if env == 'pro':
+        data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
+                                                                    config_.BUCKET_NAME,
+                                                                    config_.OSS_FOLDER_DATA,
+                                                                    datetime.datetime.today().strftime('%Y%m%d'),
+                                                                    'predict.csv')
+        os.system(data_cmd)
 
 
 
 
 def data_check(project, table, date):
 def data_check(project, table, date):
@@ -69,4 +76,9 @@ def timer_check():
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    timer_check()
+    try:
+        timer_check()
+    except Exception as e:
+        log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e))
+

+ 7 - 1
rov_pool_task.sh

@@ -1 +1,7 @@
-cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rov_data_check.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rov_data_check.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rov_data_check.py
+fi

+ 77 - 18
rov_train.py

@@ -9,12 +9,12 @@ from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
 from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
 
 
 from config import set_config
 from config import set_config
-from utils import read_from_pickle, write_to_pickle, data_normalization, request_post, filter_video_status, \
-    update_video_w_h_rate
+from utils import read_from_pickle, write_to_pickle, data_normalization, \
+    request_post, filter_video_status, update_video_w_h_rate
 from log import Log
 from log import Log
 from db_helper import RedisHelper, MysqlHelper
 from db_helper import RedisHelper, MysqlHelper
 
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
@@ -125,7 +125,7 @@ def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PA
     :param sort_columns: 指定排序列名列名,type-list, 默认为None
     :param sort_columns: 指定排序列名列名,type-list, 默认为None
     :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
     :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
     :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
     :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
-    :param data: 数据
+    :param data: 数据, type-dict
     :return: None
     :return: None
     """
     """
     if not os.path.exists(filepath):
     if not os.path.exists(filepath):
@@ -137,6 +137,26 @@ def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PA
     df.to_csv(file, index=False)
     df.to_csv(file, index=False)
 
 
 
 
+def pack_list_result_to_csv(filename, data, columns=None, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True):
+    """
+    打包数据并存入csv, 数据为字典列表
+    :param filename: csv文件名
+    :param data: 数据,type-list [{}, {},...]
+    :param columns: 列名顺序
+    :param sort_columns: 指定排序列名列名,type-list, 默认为None
+    :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
+    :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
+    :return: None
+    """
+    if not os.path.exists(filepath):
+        os.makedirs(filepath)
+    file = os.path.join(filepath, filename)
+    df = pd.DataFrame(data=data)
+    if sort_columns:
+        df = df.sort_values(by=sort_columns, ascending=ascending)
+    df.to_csv(file, index=False, columns=columns)
+
+
 def predict():
 def predict():
     """预测"""
     """预测"""
     # 读取预测数据并进行清洗
     # 读取预测数据并进行清洗
@@ -147,29 +167,62 @@ def predict():
     # 预测
     # 预测
     y_ = model.predict(x)
     y_ = model.predict(x)
     log_.info('predict finished!')
     log_.info('predict finished!')
+
     # 将结果进行归一化到[0, 100]
     # 将结果进行归一化到[0, 100]
     normal_y_ = data_normalization(list(y_))
     normal_y_ = data_normalization(list(y_))
     log_.info('normalization finished!')
     log_.info('normalization finished!')
+
+    # 按照normal_y_降序排序
+    predict_data = []
+    for i, video_id in enumerate(video_ids):
+        data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i], 'y': y[i]}
+        predict_data.append(data)
+    predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True)
+
+    # 按照排序,从100以固定差值做等差递减,以该值作为rovScore
+    predict_result = []
+    redis_data = {}
+    json_data = []
+    video_id_list = []
+    for j, item in enumerate(predict_data_sorted):
+        video_id = int(item['video_id'])
+        rov_score = 100 - j * config_.ROV_SCORE_D
+        item['rov_score'] = rov_score
+        predict_result.append(item)
+        redis_data[video_id] = rov_score
+        json_data.append({'videoId': video_id, 'rovScore': rov_score})
+        video_id_list.append(video_id)
+
     # 打包预测结果存入csv
     # 打包预测结果存入csv
-    predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids}
     predict_result_filename = 'predict.csv'
     predict_result_filename = 'predict.csv'
-    pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data)
+    pack_list_result_to_csv(filename=predict_result_filename,
+                            data=predict_result,
+                            columns=['video_id', 'rov_score', 'normal_y_', 'y_', 'y'],
+                            sort_columns=['rov_score'],
+                            ascending=False)
+
     # 上传redis
     # 上传redis
-    redis_data = {}
-    json_data = []
-    for i in range(len(video_ids)):
-        redis_data[video_ids[i]] = normal_y_[i]
-        json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]})
     key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
     key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
     redis_helper = RedisHelper()
     redis_helper = RedisHelper()
     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
     log_.info('data to redis finished!')
     log_.info('data to redis finished!')
+
+    # 清空修改ROV的视频数据
+    redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
+
     # 通知后端更新数据
     # 通知后端更新数据
-    # result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
-    # if result['code'] == 0:
-    #     log_.info('notify backend success!')
-    # else:
-    #     log_.error('notify backend fail!')
+    log_.info('json_data count = {}'.format(len(json_data)))
+    result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
+    if result['code'] == 0:
+        log_.info('notify backend success!')
+    else:
+        log_.error('notify backend fail!')
+
+    # 更新视频的宽高比数据
+    if video_id_list:
+        update_video_w_h_rate(video_ids=video_id_list,
+                              key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
+        log_.info('update video w_h_rate to redis finished!')
 
 
 
 
 def predict_test():
 def predict_test():
@@ -205,7 +258,8 @@ def predict_test():
         log_.error('notify backend fail!')
         log_.error('notify backend fail!')
     # 更新视频的宽高比数据
     # 更新视频的宽高比数据
     if filtered_videos:
     if filtered_videos:
-        update_video_w_h_rate(video_ids=filtered_videos, key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
+        update_video_w_h_rate(video_ids=filtered_videos,
+                              key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
         log_.info('update video w_h_rate to redis finished!')
         log_.info('update video w_h_rate to redis finished!')
 
 
 
 
@@ -221,6 +275,11 @@ if __name__ == '__main__':
 
 
     log_.info('rov model predict start...')
     log_.info('rov model predict start...')
     predict_start = time.time()
     predict_start = time.time()
-    predict_test()
+    if env in ['dev', 'test']:
+        predict_test()
+    elif env in ['pre', 'pro']:
+        predict()
+    else:
+        log_.error('env error')
     predict_end = time.time()
     predict_end = time.time()
     log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
     log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))

+ 7 - 1
rov_train_recall_pool_update.sh

@@ -1 +1,7 @@
-/root/anaconda3/bin/python /data2/rov-offline/get_data.py && /root/anaconda3/bin/python /data2/rov-offline/rov_train.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    /root/anaconda3/bin/python /data2/rov-offline/get_data.py && /root/anaconda3/bin/python /data2/rov-offline/rov_train.py
+elif [ $env == 'pro' ]; then
+    /root/anaconda3/bin/python /data/rov-offline/get_data.py && /root/anaconda3/bin/python /data/rov-offline/rov_train.py
+fi

+ 2 - 2
top_video_list.py

@@ -4,7 +4,7 @@ from db_helper import RedisHelper
 from config import set_config
 from config import set_config
 from log import Log
 from log import Log
 
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
@@ -32,7 +32,7 @@ def get_top_video_list():
         log_.info('value = {}'.format(value))
         log_.info('value = {}'.format(value))
         # 写入redis
         # 写入redis
         redis_helper = RedisHelper()
         redis_helper = RedisHelper()
-        redis_helper.set_data_to_redis(key_name=config_.TOP_VIDEO_LIST_KEY_NAME, value=str(value), expire_time=5*60)
+        redis_helper.set_data_to_redis(key_name=config_.TOP_VIDEO_LIST_KEY_NAME, value=str(value), expire_time=5 * 60)
         log_.info('置顶视频更新成功!')
         log_.info('置顶视频更新成功!')
 
 
     except Exception as e:
     except Exception as e:

+ 7 - 1
top_video_task.sh

@@ -1 +1,7 @@
-cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/top_video_list.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/top_video_list.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/top_video_list.py
+fi

+ 33 - 22
utils.py

@@ -1,13 +1,17 @@
+# coding:utf-8
 import pickle
 import pickle
 import os
 import os
 import requests
 import requests
 import json
 import json
+import traceback
 
 
 from odps import ODPS
 from odps import ODPS
 from config import set_config
 from config import set_config
 from db_helper import HologresHelper, MysqlHelper, RedisHelper
 from db_helper import HologresHelper, MysqlHelper, RedisHelper
+from log import Log
 
 
-config_ = set_config()
+config_, env = set_config()
+log_ = Log()
 
 
 
 
 def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
 def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
@@ -83,6 +87,23 @@ def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
     return data
     return data
 
 
 
 
+def send_msg_to_feishu(msg_text):
+    """发送消息到飞书"""
+    # webhook地址
+    webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
+    # 自定义关键词key_word
+    key_word = '服务报警'
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)
+
+
 def request_post(request_url, request_data):
 def request_post(request_url, request_data):
     """
     """
     post 请求 HTTP接口
     post 请求 HTTP接口
@@ -90,10 +111,17 @@ def request_post(request_url, request_data):
     :param request_data: 请求参数
     :param request_data: 请求参数
     :return: res_data json格式
     :return: res_data json格式
     """
     """
-    response = requests.post(url=request_url, json=request_data)
-    if response.status_code == 200:
-        res_data = json.loads(response.text)
-        return res_data
+    try:
+        response = requests.post(url=request_url, json=request_data)
+        if response.status_code == 200:
+            res_data = json.loads(response.text)
+            return res_data
+        else:
+            return None
+    except Exception as e:
+        log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e))
+        return None
 
 
 
 
 def data_normalization(data):
 def data_normalization(data):
@@ -143,23 +171,6 @@ def filter_video_status(video_ids):
     return filtered_videos
     return filtered_videos
 
 
 
 
-def send_msg_to_feishu(msg_text):
-    """发送消息到飞书"""
-    # webhook地址
-    webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
-    # 自定义关键词key_word
-    key_word = '服务报警'
-    headers = {'Content-Type': 'application/json'}
-    payload_message = {
-        "msg_type": "text",
-        "content": {
-            "text": '{}: {}'.format(key_word, msg_text)
-        }
-    }
-    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
-    print(response.text)
-
-
 def update_video_w_h_rate(video_ids, key_name):
 def update_video_w_h_rate(video_ids, key_name):
     """
     """
     获取横屏视频的宽高比,并存入redis中 (width/height>1)
     获取横屏视频的宽高比,并存入redis中 (width/height>1)

+ 2 - 4
videos_filter.py

@@ -7,7 +7,7 @@ from db_helper import RedisHelper
 from config import set_config
 from config import set_config
 from log import Log
 from log import Log
 
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 log_ = Log()
 
 
 
 
@@ -63,13 +63,11 @@ def filter_flow_pool():
                 mapping[video_id] = [flow_pool]
                 mapping[video_id] = [flow_pool]
             else:
             else:
                 mapping[video_id].append(flow_pool)
                 mapping[video_id].append(flow_pool)
-
         # 过滤
         # 过滤
         if len(video_ids) == 0:
         if len(video_ids) == 0:
             log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
             log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
             log_.info("app_type {} videos filter end!".format(app_type))
             log_.info("app_type {} videos filter end!".format(app_type))
             continue
             continue
-
         filtered_result = filter_video_status(video_ids=video_ids)
         filtered_result = filter_video_status(video_ids=video_ids)
         # 求差集,获取需要过滤掉的视频,并从redis中移除
         # 求差集,获取需要过滤掉的视频,并从redis中移除
         filter_videos = set(video_ids) - set(filtered_result)
         filter_videos = set(video_ids) - set(filtered_result)
@@ -176,7 +174,7 @@ def main():
         filter_rov_updated()
         filter_rov_updated()
     except Exception as e:
     except Exception as e:
         log_.error(traceback.format_exc())
         log_.error(traceback.format_exc())
-        send_msg_to_feishu('测试环境 - 过滤失败 \n {}'.format(traceback.format_exc()))
+        send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
         return
         return
 
 
 
 

+ 7 - 1
videos_filter_task.sh

@@ -1 +1,7 @@
-cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/videos_filter.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/videos_filter.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/videos_filter.py
+fi