瀏覽代碼

add send_msg_to_feishu & update

liqian 3 年之前
父節點
當前提交
db0df88abd
共有 3 個文件被更改,包括 102 次插入62 次删除
  1. 49 43
      pool_predict.py
  2. 22 15
      rov_train.py
  3. 31 4
      utils.py

+ 49 - 43
pool_predict.py

@@ -1,8 +1,9 @@
 import time
 import os
+import traceback
 
 from config import set_config
-from utils import request_post, filter_video_status
+from utils import request_post, filter_video_status, send_msg_to_feishu
 from log import Log
 from db_helper import RedisHelper
 
@@ -69,48 +70,53 @@ def predict(app_type):
     :param app_type: 产品标识 type-int
     :return: None
     """
-    # 从流量池获取数据
-    videos = get_videos_from_flow_pool(app_type=app_type)
-    if len(videos) <= 0:
-        log_.info('流量池中无需分发的视频')
-        return None
-    # video_id 与 flow_pool 进行mapping
-    video_ids = set()
-    log_.info('流量池中视频数:{}'.format(len(videos)))
-    mapping = {}
-    for video in videos:
-        video_id = video['videoId']
-        video_ids.add(video_id)
-        if video_id in mapping:
-            mapping[video_id].append(video['flowPool'])
-        else:
-            mapping[video_id] = [video['flowPool']]
-
-    # 对视频状态进行过滤
-    filtered_videos = filter_video_status(list(video_ids))
-    log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
-    if not filtered_videos:
-        log_.info('流量池中视频状态不符合分发')
-        return None
-    # 预测
-    video_score = get_score(filtered_videos)
-    log_.info('predict finished!')
-    # 上传数据到redis
-    redis_data = {}
-    for i in range(len(video_score)):
-        video_id = filtered_videos[i]
-        score = video_score[i]
-        for flow_pool in mapping.get(video_id):
-            value = '{}-{}'.format(video_id, flow_pool)
-            redis_data[value] = score
-    key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
-    redis_helper = RedisHelper()
-    # 如果key已存在,删除key
-    if redis_helper.key_exists(key_name):
-        redis_helper.del_keys(key_name)
-    # 写入redis
-    redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
-    log_.info('data to redis finished!')
+    try:
+        # 从流量池获取数据
+        videos = get_videos_from_flow_pool(app_type=app_type)
+        if len(videos) <= 0:
+            log_.info('流量池中无需分发的视频')
+            return None
+        # video_id 与 flow_pool 进行mapping
+        video_ids = set()
+        log_.info('流量池中视频数:{}'.format(len(videos)))
+        mapping = {}
+        for video in videos:
+            video_id = video['videoId']
+            video_ids.add(video_id)
+            if video_id in mapping:
+                mapping[video_id].append(video['flowPool'])
+            else:
+                mapping[video_id] = [video['flowPool']]
+
+        # 对视频状态进行过滤
+        filtered_videos = filter_video_status(list(video_ids))
+        log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
+        if not filtered_videos:
+            log_.info('流量池中视频状态不符合分发')
+            return None
+        # 预测
+        video_score = get_score(filtered_videos)
+        log_.info('predict finished!')
+        # 上传数据到redis
+        redis_data = {}
+        for i in range(len(video_score)):
+            video_id = filtered_videos[i]
+            score = video_score[i]
+            for flow_pool in mapping.get(video_id):
+                value = '{}-{}'.format(video_id, flow_pool)
+                redis_data[value] = score
+        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+        redis_helper = RedisHelper()
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(key_name):
+            redis_helper.del_keys(key_name)
+        # 写入redis
+        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+        log_.info('data to redis finished!')
+    except Exception as e:
+        log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
+            app_type, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline生产环境 - 流量池更新失败, appType: {}, exception: {}'.format(app_type, e))
 
 
 if __name__ == '__main__':

+ 22 - 15
rov_train.py

@@ -1,6 +1,7 @@
 import os
 import random
 import time
+import traceback
 
 import lightgbm as lgb
 import pandas as pd
@@ -9,7 +10,8 @@ from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
 
 from config import set_config
-from utils import read_from_pickle, write_to_pickle, data_normalization, request_post, filter_video_status
+from utils import read_from_pickle, write_to_pickle, data_normalization, \
+    request_post, filter_video_status, send_msg_to_feishu
 from log import Log
 from db_helper import RedisHelper, MysqlHelper
 
@@ -209,17 +211,22 @@ def predict_test():
 
 
 if __name__ == '__main__':
-    log_.info('rov model train start...')
-    train_start = time.time()
-    train_filename = config_.TRAIN_DATA_FILENAME
-    X, Y, videos, fea = process_data(filename=train_filename)
-    log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
-    train(X, Y, features=fea)
-    train_end = time.time()
-    log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
-
-    log_.info('rov model predict start...')
-    predict_start = time.time()
-    predict()
-    predict_end = time.time()
-    log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
+    try:
+        log_.info('rov model train start...')
+        train_start = time.time()
+        train_filename = config_.TRAIN_DATA_FILENAME
+        X, Y, videos, fea = process_data(filename=train_filename)
+        log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
+        train(X, Y, features=fea)
+        train_end = time.time()
+        log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
+
+        log_.info('rov model predict start...')
+        predict_start = time.time()
+        predict()
+        predict_end = time.time()
+        log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
+    except Exception as e:
+        log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline生产环境 - ROV召回池更新失败, exception: {}'.format(e))
+

+ 31 - 4
utils.py

@@ -3,12 +3,15 @@ import pickle
 import os
 import requests
 import json
+import traceback
 
 from odps import ODPS
 from config import set_config
 from db_helper import HologresHelper
+from log import Log
 
 config_ = set_config()
+log_ = Log()
 
 
 def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
@@ -84,6 +87,23 @@ def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
     return data
 
 
+def send_msg_to_feishu(msg_text):
+    """发送消息到飞书"""
+    # webhook地址
+    webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
+    # 自定义关键词key_word
+    key_word = '服务报警'
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)
+
+
 def request_post(request_url, request_data):
     """
     post 请求 HTTP接口
@@ -91,10 +111,17 @@ def request_post(request_url, request_data):
     :param request_data: 请求参数
     :return: res_data json格式
     """
-    response = requests.post(url=request_url, json=request_data)
-    if response.status_code == 200:
-        res_data = json.loads(response.text)
-        return res_data
+    try:
+        response = requests.post(url=request_url, json=request_data)
+        if response.status_code == 200:
+            res_data = json.loads(response.text)
+            return res_data
+        else:
+            return None
+    except Exception as e:
+        log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
+        send_msg_to_feishu('rov-offline生产环境 - 接口请求失败:{}, exception: {}'.format(request_url, e))
+        return None
 
 
 def data_normalization(data):