소스 검색

feat:添加定时任务日志

zhaohaipeng 9 달 전
부모
커밋
a7abc3e038
2개의 변경된 파일181개의 추가작업 그리고 77개의 파일을 삭제
  1. 109 55
      ad/01_ad_model_update_everyday.sh
  2. 72 22
      ad/ad_monitor_util.py

+ 109 - 55
ad/01_ad_model_update_everyday.sh

@@ -26,7 +26,6 @@ start_time=$(date "+%Y-%m-%d %H:%M:%S")
 elapsed=0
 LOG_PREFIX=广告模型自动更新任务
 
-
 # 1 判断依赖的数据表是否生产完成
 source /root/anaconda3/bin/activate py37
 while true; do
@@ -50,7 +49,10 @@ done
 echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
 
 
+
+
 # 2 原始特征生成
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
 --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
@@ -61,18 +63,21 @@ savePath:${originDataSavePath} \
 table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
 idDefaultValue:0.01
 
-elapsed=$(($(date +%s -d "${start_time}") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+step_elapsed=$(($(date +%s -d "$step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
 if [ $? -ne 0 ]; then
    msg="Spark原始样本生产任务执行失败"
-   echo "$LOG_PREFIX -- 原始样本生产 -- $msg: 耗时 $elapsed"
+   echo "$LOG_PREFIX -- 原始样本生产 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
    /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
    exit 1
-else
-   echo "$LOG_PREFIX -- 原始样本生产 -- Spark原始样本生产任务执行成功: 耗时 $elapsed"
 fi
+echo "$LOG_PREFIX -- 原始样本生产 -- Spark原始样本生产任务执行成功: 耗时 $step_elapsed"
+
+
 
 
 # 3 特征分桶
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
 /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
 --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
@@ -82,69 +87,93 @@ filterNames:adid_,targeting_conversion_ \
 readPath:${originDataSavePath} \
 savePath:${bucketFeatureSavePath}
 
-elapsed=$(($(date +%s -d "${start_time}") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+step_elapsed=$(($(date +%s -d "$step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
 if [ $? -ne 0 ]; then
    msg="Spark特征分桶处理任务执行失败"
-   echo "$LOG_PREFIX -- 特征分桶处理任务 -- $msg: 耗时 $elapsed"
+   echo "$LOG_PREFIX -- 特征分桶处理任务 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
    /root/anaconda3/bin/python ad/ad_monitor_util.py ${msg}
    exit 1
-else
-   echo "$LOG_PREFIX -- 特征分桶处理任务 -- spark特征分桶处理执行成功: 耗时 $elapsed"
 fi
+echo "$LOG_PREFIX -- 特征分桶处理任务 -- spark特征分桶处理执行成功: 耗时 $step_elapsed"
+
+
 
 
 # 4 模型训练
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
 $HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | ${FM_HOME}/bin/fm_train -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 1,1,8  -im ${LAST_MODEL_HOME}/model_online.txt -core 8
+step_elapsed=$(($(date +%s -d "$step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
 if [ $? -ne 0 ]; then
-   echo "模型训练失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "广告模型训练失败"
+   msg "模型训练失败"
+   echo "$LOG_PREFIX -- 原始样本生产 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
    exit 1
 fi
+echo "$LOG_PREFIX -- 原始样本生产 -- 模型训练完成: 耗时 $step_elapsed"
+
+
 
 
 # 5 对比AUC
 $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | ${FM_HOME}/bin/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_online.txt
 $HADOOP fs -text ${bucketFeatureSavePath}/${today}/* | ${FM_HOME}/bin/fm_predict -m ${MODEL_PATH}/${model_name}_${today_early_1}.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_new.txt
-
 # 5.1 计算线上模型的AUC
+step5_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
 online_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_online.txt | /root/sunmingze/AUC/AUC`
+elapsed=$(($(date +%s -d "step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
 if [ $? -ne 0 ]; then
-   echo "线上模型AUC计算失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "线上模型AUC计算失败"
+   msg="线上模型AUC计算失败"
+   echo "$LOG_PREFIX -- 线上模型AUC计算 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
    exit 1
 fi
+echo "$LOG_PREFIX -- 线上模型AUC计算 -- 线上模型AUC计算完成: 耗时 $step_elapsed"
 
 # 5.2 计算新模型的AUC
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
 new_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_new.txt | /root/sunmingze/AUC/AUC`
+elapsed=$(($(date +%s -d "step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
 if [ $? -ne 0 ]; then
-   echo "新模型AUC计算失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型AUC计算失败"
+   msg="新模型AUC计算失败"
+   echo "$LOG_PREFIX -- 新模型AUC计算 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
    exit 1
 fi
-
+echo "$LOG_PREFIX -- 新模型AUC计算 -- 新模型AUC计算完成: 耗时 $step_elapsed"
 echo "AUC比对: 线上模型的AUC: ${online_auc}, 新模型的AUC: ${new_auc}"
 
-# 5.3 计算新模型与线上模型的AUC差值
+# 5.3 计算新模型与线上模型的AUC差值的绝对值
 auc_diff=$(echo "$online_auc - $new_auc" | bc -l)
-
-# 5.4 获取差值的绝对值
 auc_diff_abs=$(echo "sqrt(($auc_diff)^2)" | bc -l)
 
-# 5.5 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型
+step5_elapsed=$(($(date +%s -d "$step5_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+# 5.4 如果差值的绝对值小于0.005且新模型的AUC大于0.73, 则更新模型
 if (( $(echo "${online_auc} <= ${new_auc}" | bc -l) )); then
-    echo "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
-    /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
+    msg="新模型优于线上模型 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc}"
+    echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed"
+
 elif (( $(echo "$auc_diff_abs < 0.005" | bc -l) )) && (( $(echo "$new_auc >= 0.73" | bc -l) )); then
-    echo "新模型与线上模型差值小于阈值0.005: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}, 差值为: $auc_diff_abs"
-    /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型与线上模型差值小于阈值0.005: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}, 差值为: $auc_diff_abs"
+    msg="新模型与线上模型差值小于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
+    echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed"
+
 else
-    echo "新模型与线上模型差值大于等于阈值0.005: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}, 差值为: $auc_diff_abs"
-    /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型与线上模型差值大于等于阈值0.005: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}, 差值为: $auc_diff_abs"
+    msg="新模型与线上模型差值大于等于阈值0.005 \n\t线上模型AUC: ${online_auc} \n\t新模型AUC: ${new_auc} \n\t差值为: $auc_diff_abs"
+    echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step5_elapsed"
+    elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+    /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
     exit 1
 fi
 
 
+
 # 6 模型格式转换
+step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+change_txt_path=${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
 cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt |
 awk -F " " '{
     if (NR == 1) {
@@ -158,39 +187,64 @@ awk -F " " '{
         }
         print line
     }
-}' > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
+}' > "$change_txt_path"
+step_elapsed=$(($(date +%s -d "step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+
 if [ $? -ne 0 ]; then
-   echo "新模型文件格式转换失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "新模型文件格式转换失败"
+   msg="新模型文件格式转换失败"
+   echo -e "$LOG_PREFIX -- AUC对比 -- $msg: 耗时 $step_elapsed"
+   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
    exit 1
 fi
+echo -e "$LOG_PREFIX -- 模型文件格式转换 -- 转换后的路径为 [$change_txt_path]: 耗时 $step_elapsed"
 
-# 7 模型文件上传OSS
-online_model_path=${OSS_PATH}/${model_name}.txt
-$HADOOP fs -test -e ${online_model_path}
-if [ $? -eq 0 ]; then
-    echo "数据存在, 先删除。"
-    $HADOOP fs -rm -r -skipTrash ${online_model_path}
-else
-    echo "数据不存在"
-fi
 
-$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
-if [ $? -eq 0 ]; then
-   echo "推荐模型文件至OSS成功"
-else
-   echo "推荐模型文件至OSS失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "推荐模型文件至OSS失败"
-   exit 1
-fi
 
-# 7.3 本地保存最新的线上使用的模型,用于下一次的AUC验证
-cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt
-cp -f ${MODEL_PATH}/${model_name}_${today_early_1}.txt ${LAST_MODEL_HOME}/model_online.txt
-if [ $? -ne 0 ]; then
-   echo "模型备份失败"
-   /root/anaconda3/bin/python ad/ad_monitor_util.py "模型备份失败 - 最新模型地址: ${MODEL_PATH}/${model_name}_${today_early_1}.txt"
-   exit 1
-fi
+
+## 7 模型文件上传OSS
+#step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+#online_model_path=${OSS_PATH}/${model_name}.txt
+#$HADOOP fs -test -e ${online_model_path}
+#if [ $? -eq 0 ]; then
+#    echo "数据存在, 先删除。"
+#    $HADOOP fs -rm -r -skipTrash ${online_model_path}
+#else
+#    echo "数据不存在"
+#fi
+#$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}
+#step_elapsed=$(($(date +%s -d "step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+#if [ $? -ne 0 ]; then
+#   msg="广告模型文件至OSS失败, OSS模型文件路径: $online_model_path"
+#   echo -e "$LOG_PREFIX -- 模型文件推送至OSS -- $msg: 耗时 $step_elapsed"
+#   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+#   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+#   exit 1
+#fi
+#echo -e "$LOG_PREFIX -- 模型文件推送至OSS -- 广告模型文件至OSS成功, OSS模型文件路径 $online_model_path: 耗时 $step_elapsed"
+
+
+
+
+## 8 本地保存最新的线上使用的模型,用于下一次的AUC验证
+#step_start_time=$(date "+%Y-%m-%d %H:%M:%S")
+#cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt
+#cp -f ${MODEL_PATH}/${model_name}_${today_early_1}.txt ${LAST_MODEL_HOME}/model_online.txt
+#step_elapsed=$(($(date +%s -d "step_start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+#if [ $? -ne 0 ]; then
+#   msg="模型备份失败"
+#   echo -e "$LOG_PREFIX -- 模型备份 -- $msg: 耗时 $step_elapsed"
+#   elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+#   /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+#   exit 1
+#fi
+#echo -e "$LOG_PREFIX -- 模型备份 -- 模型备份完成: 耗时 $step_elapsed"
+
+
+
+
+msg="广告模型文件更新完成 \n\t \n\t 新模型AUC: $new_auc \n\t 线上模型AUC: $online_auc AUC差值: $auc_diff_abs \n\t 模型上传路径: $online_model_path \n\t"
+elapsed=$(($(date +%s -d "$start_time") - $(date +%s -d "+%Y-%m-%d %H:%M:%S")))
+/root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
 
 # 32 16 * * * cd /root/zhangbo/recommend-emr-dataprocess && /bin/sh ./ad/01_ad_model_update_everyday.sh > logs/01_update_eventday$(date +\%Y-\%m-\%d_\%H).log 2>&1

+ 72 - 22
ad/ad_monitor_util.py

@@ -1,16 +1,31 @@
 # -*- coding: utf-8 -*-
-import sys
+import argparse
+import datetime
 import json
+
 import requests
-import datetime
 
 server_robot = {
     'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
     'key_word': '定时任务告警'
 }
 
+level_header_template_map = {
+    "info": "turquoise",
+    "error": "red",
+    "warn": "yellow"
+}
+
+level_header_title_content_map = {
+    "info": "广告模型自动更新通知",
+    "error": "广告模型自动更新告警",
+    "warn": "广告模型自动更新告警"
+}
+
+
 def send_msg_to_feishu(webhook, key_word, msg_text):
     """发送消息到飞书"""
+    print(f"推送飞书消息内容: {msg_text}")
     headers = {'Content-Type': 'application/json'}
     payload_message = {
         "msg_type": "text",
@@ -18,30 +33,65 @@ def send_msg_to_feishu(webhook, key_word, msg_text):
             "text": '{}: {}'.format(key_word, msg_text)
         }
     }
-    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
-    print(response.text)
-
-
-def _monitor(dt, hh, msg):
-    """rov模型预测列表"""
-    if hh > 6:
-        msg_text = f"\n- 任务名称: 广告数据模型自动更新任务" \
-                   f"\n- 告警名称: 广告数据模型自动更新" \
-                   f"\n- 所属环境: 线上" \
-                   f"\n- now_date: {dt}" \
-                   f"\n- now_h: {hh}" \
-                   f"\n- 告警描述: {msg}"
-        print(f"msg_text = {msg_text}")
+    # response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    # print(response.text)
+
+
+def seconds_convert(seconds):
+    hours = seconds // 3600
+    minutes = (seconds % 3600) // 60
+    seconds = seconds % 60
+    return f"{hours}小时 {minutes}分钟 {seconds}秒"
+
+
+def _monitor(level, msg, start, elapsed):
+    """消息推送"""
+    now = datetime.datetime.now()
+    if now.hour > 6:
+        mgs_text = f"- 当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}" \
+                   f"\n- 任务开始时间: {start}" \
+                   f"\n- 任务耗时: {seconds_convert(elapsed)}" \
+                   f"\n- 任务描述: {msg}"
+        card_json = {
+            "config": {
+                "wide_screen_mode": "true"
+            },
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": mgs_text,
+                        "tag": "lark_md"
+                    }
+                }
+            ],
+            "header": {
+                "template": level_header_template_map[level],
+                "title": {
+                    "content": level_header_title_content_map[level],
+                    "tag": "plain_text"
+                }
+            }
+        }
+
         send_msg_to_feishu(
             webhook=server_robot.get('webhook'),
             key_word=server_robot.get('key_word'),
-            msg_text=msg_text
+            msg_text=json.dumps(card_json)
         )
 
 
 if __name__ == '__main__':
-    dt = datetime.datetime.today().strftime('%Y%m%d')
-    hh = datetime.datetime.now().hour
-    msg = sys.argv[1]
-    _monitor(dt, hh, msg)
-    print("end")
+    parser = argparse.ArgumentParser(description='告警Utils')
+    parser.add_argument('--level', type=str, help='通知级别, info, warn, error', required=True)
+    parser.add_argument('--msg', type=str, help='消息', required=True)
+    parser.add_argument('--start', type=str, help='任务开始时间', required=True)
+    parser.add_argument('--elapsed', type=int, help='任务耗时【秒】', required=True)
+    args = parser.parse_args()
+
+    _monitor(
+        level=args.level,
+        msg=args.msg,
+        start=args.start,
+        elapsed=args.elapsed
+    )