liqian 3 anni fa
parent
commit
a16d4df9e8
17 ha cambiato i file con 203 aggiunte e 35 eliminazioni
  1. 2 0
      .gitignore
  2. 7 1
      bottom_task.sh
  3. 54 4
      bottom_videos.py
  4. 67 4
      config.py
  5. 1 1
      db_helper.py
  6. 1 0
      env.conf
  7. 7 1
      flow_pool_task.sh
  8. 1 1
      get_data.py
  9. 3 2
      pool_predict.py
  10. 12 10
      rov_data_check.py
  11. 7 1
      rov_pool_task.sh
  12. 16 3
      rov_train.py
  13. 7 1
      rov_train_recall_pool_update.sh
  14. 7 1
      top_video_task.sh
  15. 2 2
      utils.py
  16. 2 2
      videos_filter.py
  17. 7 1
      videos_filter_task.sh

+ 2 - 0
.gitignore

@@ -63,3 +63,5 @@ target/
 data/
 *.csv
 *.pickle
+
+env.conf

+ 7 - 1
bottom_task.sh

@@ -1 +1,7 @@
-cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/bottom_videos.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/bottom_videos.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/bottom_videos.py
+fi

+ 54 - 4
bottom_videos.py

@@ -9,15 +9,45 @@ from db_helper import RedisHelper
 from config import set_config
 from log import Log
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
-def update_bottom_videos():
-    """更新兜底视频"""
+def get_bottom_videos_test():
+    """获取测试环境兜底视频"""
+    try:
+        # 获取总播放量top1000的视频
+        sql = "SELECT  id " \
+              ",play_count_total " \
+              "FROM    videoods.wx_video_test " \
+              "WHERE   transcode_status = 3 " \
+              "AND     STATUS = 1 " \
+              "AND     recommend_status IN ( - 6, 1) " \
+              "ORDER BY play_count_total DESC " \
+              "LIMIT   2000" \
+              ";"
+
+        records = execute_sql_from_odps(project='videoods', sql=sql)
+
+        # 视频按照总播放量写入redis
+        video_id_list = []
+        videos = {}
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['id'])
+                video_id_list.append(video_id)
+                videos[video_id] = record['play_count_total']
+
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def get_bottom_videos_pro(now_date):
+    """获取生产环境兜底视频"""
     try:
         # 获取昨日播放量top1000的视频
-        now_date = datetime.datetime.today()
         delta_date = now_date - datetime.timedelta(days=1)
 
         sql = "SELECT  video_playcount.videoid " \
@@ -55,6 +85,26 @@ def update_bottom_videos():
                 video_id_list.append(video_id)
                 videos[video_id] = record['play_count']
 
+        return video_id_list, videos
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+def update_bottom_videos():
+    """更新兜底视频"""
+    try:
+        # 获取对应环境的兜底视频
+        now_date = datetime.datetime.today()
+        if env in ['dev', 'test']:
+            video_id_list, videos = get_bottom_videos_test()
+        elif env in ['pre', 'pro']:
+            video_id_list, videos = get_bottom_videos_pro(now_date=now_date)
+        else:
+            log_.error('env error')
+            return
+
+        # redis数据更新
         redis_helper = RedisHelper()
         redis_helper.del_keys(key_name=config_.BOTTOM_KEY_NAME)
         redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)

+ 67 - 4
config.py

@@ -1,4 +1,7 @@
 # coding:utf-8
+from log import Log
+log_ = Log()
+
 
 class BaseConfig(object):
     # 产品标识
@@ -71,6 +74,11 @@ class BaseConfig(object):
 
 class DevelopmentConfig(BaseConfig):
     """开发环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "开发环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -87,6 +95,16 @@ class DevelopmentConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 测试环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
@@ -109,6 +127,11 @@ class DevelopmentConfig(BaseConfig):
 
 class TestConfig(BaseConfig):
     """测试环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "测试环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data2/rov-offline'
+
     # 测试环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -125,6 +148,16 @@ class TestConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 测试环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
@@ -147,6 +180,11 @@ class TestConfig(BaseConfig):
 
 class PreProductionConfig(BaseConfig):
     """预发布环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "预发布环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -163,6 +201,16 @@ class PreProductionConfig(BaseConfig):
         'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
     }
 
+    # 生产环境mysql地址
+    MYSQL_INFO = {
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'wx2016_longvideo',
+        'password': 'wx2016_longvideoP@assword1234',
+        'db': 'longvideo',
+        'charset': 'utf8'
+    }
+
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
@@ -185,6 +233,11 @@ class PreProductionConfig(BaseConfig):
 
 class ProductionConfig(BaseConfig):
     """生产环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "生产环境"
+    # 项目存放目录
+    PROJECT_PATH = '/data/rov-offline'
+
     # 线上环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -232,7 +285,17 @@ class ProductionConfig(BaseConfig):
 
 
 def set_config():
-    # return DevelopmentConfig()
-    # return TestConfig()
-    # return PreProductionConfig()
-    return ProductionConfig()
+    with open('env.conf', 'r') as env_f:
+        line = env_f.read()
+        env = line.split('=')[1].strip()
+    if env == 'dev':
+        return DevelopmentConfig(), env
+    elif env == 'test':
+        return TestConfig(), env
+    elif env == 'pre':
+        return PreProductionConfig(), env
+    elif env == 'pro':
+        return ProductionConfig(), env
+    else:
+        log_.error('env error')
+        return

+ 1 - 1
db_helper.py

@@ -5,7 +5,7 @@ import pymysql
 from config import set_config
 from log import Log
 
-config_ = set_config()
+config_, _ = set_config()
 log = Log()
 
 

+ 1 - 0
env.conf

@@ -0,0 +1 @@
+ENV=dev

+ 7 - 1
flow_pool_task.sh

@@ -1 +1,7 @@
-cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+fi

+ 1 - 1
get_data.py

@@ -7,7 +7,7 @@ from config import set_config
 from utils import get_data_from_odps, write_to_pickle
 from log import Log
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 
 

+ 3 - 2
pool_predict.py

@@ -8,7 +8,7 @@ from utils import request_post, filter_video_status, send_msg_to_feishu
 from log import Log
 from db_helper import RedisHelper
 
-config_ = set_config()
+config_, _ = set_config()
 log_ = Log()
 
 
@@ -118,7 +118,8 @@ def predict(app_type):
     except Exception as e:
         log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
             app_type, e, traceback.format_exc()))
-        send_msg_to_feishu('rov-offline生产环境 - 流量池更新失败, appType: {}, exception: {}'.format(app_type, e))
+        send_msg_to_feishu('rov-offline{} - 流量池更新失败, appType: {}, exception: {}'.format(
+            config_.ENV_TEXT, app_type, e))
 
 
 if __name__ == '__main__':

+ 12 - 10
rov_data_check.py

@@ -9,24 +9,26 @@ from config import set_config
 from log import Log
 from utils import send_msg_to_feishu
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
 def rov_train_recall_pool_update():
     # 训练数据和预测数据都准备好时,更新模型,预测
-    os.system('sh /data/rov-offline/rov_train_recall_pool_update.sh')
+    os.system('sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH))
     # # 将日志上传到oss
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     #                                                    config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
     # os.system(log_cmd)
-    # 将预测得到的ROV数据文件predict.csv上传到oss
-    data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("/data/rov-offline/data/predict.csv",
-                                                                config_.BUCKET_NAME,
-                                                                config_.OSS_FOLDER_DATA,
-                                                                datetime.datetime.today().strftime('%Y%m%d'),
-                                                                'predict.csv')
-    os.system(data_cmd)
+
+    # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
+    if env == 'pro':
+        data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
+                                                                    config_.BUCKET_NAME,
+                                                                    config_.OSS_FOLDER_DATA,
+                                                                    datetime.datetime.today().strftime('%Y%m%d'),
+                                                                    'predict.csv')
+        os.system(data_cmd)
 
 
 def data_check(project, table, date):
@@ -78,5 +80,5 @@ if __name__ == '__main__':
         timer_check()
     except Exception as e:
         log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
-        send_msg_to_feishu('rov-offline生产环境 - ROV召回池更新失败, exception: {}'.format(e))
+        send_msg_to_feishu('rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e))
 

+ 7 - 1
rov_pool_task.sh

@@ -1 +1,7 @@
-cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rov_data_check.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rov_data_check.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rov_data_check.py
+fi

+ 16 - 3
rov_train.py

@@ -14,7 +14,7 @@ from utils import read_from_pickle, write_to_pickle, data_normalization, \
 from log import Log
 from db_helper import RedisHelper, MysqlHelper
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
@@ -234,7 +234,7 @@ def predict_test():
     video_ids = [video[0] for video in data]
     # 视频状态过滤
     filtered_videos = filter_video_status(video_ids)
-    log_.info('filtered_videos nums={}'.format(len(filtered_videos)))
+    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
     # 随机生成 0-100 数作为分数
     redis_data = {}
     json_data = []
@@ -242,17 +242,25 @@ def predict_test():
         score = random.uniform(0, 100)
         redis_data[video_id] = score
         json_data.append({'videoId': video_id, 'rovScore': score})
+    log_.info('json_data count = {}'.format(len(json_data)))
     # 上传Redis
     redis_helper = RedisHelper()
     key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
     log_.info('test data to redis finished!')
+    # 清空修改ROV的视频数据
+    redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
     # 通知后端更新数据
     result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
     if result['code'] == 0:
         log_.info('notify backend success!')
     else:
         log_.error('notify backend fail!')
+    # 更新视频的宽高比数据
+    if filtered_videos:
+        update_video_w_h_rate(video_ids=filtered_videos,
+                              key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
+        log_.info('update video w_h_rate to redis finished!')
 
 
 if __name__ == '__main__':
@@ -267,6 +275,11 @@ if __name__ == '__main__':
 
     log_.info('rov model predict start...')
     predict_start = time.time()
-    predict()
+    if env in ['dev', 'test']:
+        predict_test()
+    elif env in ['pre', 'pro']:
+        predict()
+    else:
+        log_.error('env error')
     predict_end = time.time()
     log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))

+ 7 - 1
rov_train_recall_pool_update.sh

@@ -1 +1,7 @@
-/root/anaconda3/bin/python /data/rov-offline/get_data.py && /root/anaconda3/bin/python /data/rov-offline/rov_train.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    /root/anaconda3/bin/python /data2/rov-offline/get_data.py && /root/anaconda3/bin/python /data2/rov-offline/rov_train.py
+elif [ $env == 'pro' ]; then
+    /root/anaconda3/bin/python /data/rov-offline/get_data.py && /root/anaconda3/bin/python /data/rov-offline/rov_train.py
+fi

+ 7 - 1
top_video_task.sh

@@ -1 +1,7 @@
-cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/top_video_list.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/top_video_list.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/top_video_list.py
+fi

+ 2 - 2
utils.py

@@ -10,7 +10,7 @@ from config import set_config
 from db_helper import HologresHelper, MysqlHelper, RedisHelper
 from log import Log
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
@@ -120,7 +120,7 @@ def request_post(request_url, request_data):
             return None
     except Exception as e:
         log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
-        send_msg_to_feishu('rov-offline生产环境 - 接口请求失败:{}, exception: {}'.format(request_url, e))
+        send_msg_to_feishu('rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e))
         return None
 
 

+ 2 - 2
videos_filter.py

@@ -7,7 +7,7 @@ from db_helper import RedisHelper
 from config import set_config
 from log import Log
 
-config_ = set_config()
+config_, env = set_config()
 log_ = Log()
 
 
@@ -174,7 +174,7 @@ def main():
         filter_rov_updated()
     except Exception as e:
         log_.error(traceback.format_exc())
-        send_msg_to_feishu('生产环境 - 过滤失败 \n {}'.format(traceback.format_exc()))
+        send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
         return
 
 

+ 7 - 1
videos_filter_task.sh

@@ -1 +1,7 @@
-cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/videos_filter.py
+source env.conf
+env=$ENV
+if [ $env == 'test' ]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/videos_filter.py
+elif [ $env == 'pro' ]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/videos_filter.py
+fi