Browse Source

feat:全流程02更新脚本自测

zhaohaipeng 9 months ago
parent
commit
f0fb31aec0
3 changed files with 243 additions and 4 deletions
  1. 207 0
      ad/02_ad_model_update_twice_daily_v2.sh
  2. 21 2
      ad/03_delete_timer_file.sh
  3. 15 2
      ad/ad_monitor_util.py

+ 207 - 0
ad/02_ad_model_update_twice_daily_v2.sh

@@ -0,0 +1,207 @@
+#!/bin/sh
+set -x
+
+
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+source /root/anaconda3/bin/activate py37
+
+# 全局常量
+originDataSavePath=/dw/recommend/model/31_ad_sample_data_v3_auto_test
+bucketFeatureSavePathHome=/dw/recommend/model/33_ad_train_data_v3_auto_test
+model_name=model_bkb8_v3_test
+LAST_MODEL_HOME=/root/zhaohp/model_online_test
+
+MODEL_HOME=/root/zhaohp/recommend-emr-dataprocess/model
+OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model
+
+PREDICT_PATH=/root/zhaohp/recommend-emr-dataprocess/predict
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+FM_HOME=/root/sunmingze/alphaFM
+
+today="$(date +%Y%m%d)"
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+
+start_time=$(date +%s)
+elapsed=0
+LOG_PREFIX=广告模型自动更新任务
+
+# 训练和预测数据分区
+train_begin_str=''
+train_end_str=''
+predict_begin_str=''
+predict_end_str=''
+
+# HDFS保存数据的目录
+trainBucketFeaturePath=${bucketFeatureSavePathHome}
+predictBucketFeaturePath=${bucketFeatureSavePathHome}
+
+local_model_file_path=${MODEL_HOME}/${model_name}.txt
+local_change_model_file_path=${MODEL_HOME}/${model_name}_change.txt
+
+max_hour=21
+max_minute=20
+
+# 全局初始化
+global_init() {
+    # 获取当前小时,确定需要使用的数据分区范围
+    local current_hour="$(date +%H)"
+    # if [ $current_hour -lt 08 ]; then
+        train_begin_str=${today_early_1}14
+        train_end_str=${today_early_1}21
+        predict_begin_str=${today_early_1}22
+        predict_end_str=${today_early_1}23
+
+        trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/train
+        predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today_early_1}/predict
+
+        local_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}.txt
+        local_change_model_file_path=${MODEL_HOME}/${model_name}_${train_end_str}_change.txt
+        max_hour=12
+    # elif [ $current_hour -ge 20 ]; then 
+    #     train_begin_str=${today_early_1}22
+    #     train_end_str=${today}13
+    #     predict_begin_str=${today}14
+    #     predict_end_str=${today}15
+
+    #     trainBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/train
+    #     predictBucketFeaturePath=${bucketFeatureSavePathHome}/${today}/predict
+
+    #     local_model_file_path=${MODEL_HOME}/${train_end_str}.txt
+    #     local_change_model_file_path=${MODEL_HOME}/${train_end_str}_change.txt
+    #     max_hour=21
+
+    # else
+    #     echo "当前时间段异常: 退出任务"
+    #     exit 1
+    # fi
+
+    # 删除HDFS目录,保证本次任务运行时目录干净
+    $HADOOP fs -rm -r -skipTrash ${trainBucketFeaturePath}
+    $HADOOP fs -rm -r -skipTrash ${predictBucketFeaturePath}
+
+    echo "全局变量初始化化: "
+    echo "  train_begin_str=${train_begin_str}"
+    echo "  train_end_str=${train_end_str}"
+    echo "  predict_begin_str=${predict_begin_str}"
+    echo "  predict_end_str=${predict_end_str}"
+    echo "  originDataSavePath=${originDataSavePath}"
+    echo "  trainBucketFeaturePath=${trainBucketFeaturePath}"
+    echo "  predictBucketFeaturePath=${predictBucketFeaturePath}"
+    echo "  local_model_file_path=${local_model_file_path}"
+    echo "  local_change_model_file_path=${local_change_model_file_path}"
+    echo "  max_hour=${max_hour}"
+
+}
+
+# 校验命令的退出码
+check_run_status() {
+    local status=$1
+    local step_start_time=$2
+    local step_name=$3
+
+    local step_end_time=$(date +%s)
+    local step_elapsed=$(($step_end_time - $step_start_time))
+
+    if [ $status -ne 0 ]; then
+        echo "$LOG_PREFIX -- ${step_name}失败: 耗时 $step_elapsed"
+        local elapsed=$(($step_end_time - $start_time))
+        # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+        exit 1
+    else
+        echo "$LOG_PREFIX -- ${step_name}成功: 耗时 $step_elapsed"
+    fi
+}
+
+# 校验大数据任务是否执行完成
+check_ad_hive() {
+    local step_start_time=$(date +%s)
+    while true; do
+        local python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${predict_end_str:0:8} --hh ${predict_end_str:8:10})
+
+        local step_end_time=$(date +%s)
+        local elapsed=$(($step_end_time - $step_start_time))
+        if [ "$python_return_code" -eq 0 ]; then
+            break
+        fi
+        echo "Python程序返回非0值,等待五分钟后再次调用。"
+        sleep 300
+        local current_hour=$(date +%H)
+        local current_minute=$(date +%M)
+        if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+            local msg="大数据数据生产校验失败, 分区: ${today}10"
+            echo -e "$LOG_PREFIX -- 大数据数据生产校验 -- ${msg}: 耗时 $elapsed"
+            # /root/anaconda3/bin/python ad/ad_monitor_util.py --level error --msg "$msg" --start "$start_time" --elapsed "$elapsed"
+            exit 1
+        fi
+    done
+    echo "$LOG_PREFIX -- 大数据数据生产校验 -- 大数据数据生产校验通过: 耗时 $elapsed"
+
+}
+
+# 原始特征生产
+make_origin_data() {
+    local step_start_time=$(date +%s)
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
+    --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    tablePart:64 repartition:16 \
+    beginStr:${train_begin_str} endStr:${predict_end_str} \
+    savePath:${originDataSavePath} \
+    table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
+    idDefaultValue:0.01
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "Spark原始样本生产任务"
+
+}
+
+# 特征分桶,训练用的数据和预测用的数据分不同的目录
+make_bucket_feature() {
+    local step_start_time=$(date +%s)
+    # 训练用的数据
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
+    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    beginStr:${train_begin_str} endStr:${train_end_str} repartition:100 \
+    filterNames:adid_,targeting_conversion_ \
+    readPath:${originDataSavePath} \
+    savePath:${trainBucketFeaturePath}
+
+    local return_code=$?
+    check_run_status $return_code $step_start_time "Spark特征分桶任务: 训练数据分桶"
+    
+    # 预测用的数据
+    /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+    --class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240717 \
+    --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+    ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+    beginStr:${predict_begin_str} endStr:${predict_end_str} repartition:100 \
+    filterNames:adid_,targeting_conversion_ \
+    readPath:${originDataSavePath} \
+    savePath:${predictBucketFeaturePath}
+
+    return_code=$?
+    check_run_status $return_code $step_start_time "Spark特征分桶任务: 预测数据分桶"
+}
+
+main() {
+
+    global_init
+
+    check_ad_hive
+
+    make_origin_data
+
+    make_bucket_feature
+
+}
+
+
+main
+

+ 21 - 2
ad/03_delete_timer_file.sh

@@ -2,10 +2,18 @@
 
 
 set -x
 set -x
 
 
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+
 PREDICT_HOME=/root/zhaohp/recommend-emr-dataprocess/predict
 PREDICT_HOME=/root/zhaohp/recommend-emr-dataprocess/predict
+origin_data_hdfs_dir=/dw/recommend/model/31_ad_sample_data_v3_auto_test
+bucket_feature_hdfs_dir=/dw/recommend/model/33_ad_train_data_v3_auto_test
+
+
 
 
 # 删除五天之前的预测结果文件
 # 删除五天之前的预测结果文件
-delete_predict_early_5d() {
+delete_predict_5d_ago() {
 
 
     echo "=========== 开始删除五天前的预测结果文件 $(date "+%Y-%m-%d %H:%M:%d") ==========="
     echo "=========== 开始删除五天前的预测结果文件 $(date "+%Y-%m-%d %H:%M:%d") ==========="
 
 
@@ -26,8 +34,19 @@ delete_predict_early_5d() {
     echo "=========== 删除五天前的预测结果文件结束 $(date "+%Y-%m-%d %H:%M:%d") ==========="
     echo "=========== 删除五天前的预测结果文件结束 $(date "+%Y-%m-%d %H:%M:%d") ==========="
 }
 }
 
 
+# 删除五天之前 HDFS中的原始特征数据
+delete_hdfs_origin_data_5d_ago() {
+    FIVE_DAYS_AGO=$(date -d "5 days ago" +%s)
+
+
+    $HADOOP fs -ls $HDFS_DIR | grep '^d' | awk '{print $6, $8}' | while read line
+        echo "${line}"
+    do
+
+}
+
 main() {
 main() {
-    delete_predict_early_5d
+    # delete_predict_early_5d
 }
 }
 
 
 
 

+ 15 - 2
ad/ad_monitor_util.py

@@ -1,10 +1,12 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
 import argparse
 import argparse
-import datetime
 import json
 import json
 
 
+import pytz
 import requests
 import requests
 
 
+from datetime import datetime
+
 server_robot = {
 server_robot = {
     'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
     'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
 }
 }
@@ -40,6 +42,17 @@ def send_card_msg_to_feishu(webhook, card_json):
     print(response.text)
     print(response.text)
 
 
 
 
+def timestamp_format(timestamp: str) -> str:
+    try:
+        return (datetime.utcfromtimestamp(int(timestamp))
+                .replace(tzinfo=pytz.UTC)
+                .astimezone(pytz.timezone('Asia/Shanghai'))
+                .strftime('%Y-%m-%d %H:%M:%S')
+                )
+    except ValueError as e:
+        return timestamp
+
+
 def seconds_convert(seconds):
 def seconds_convert(seconds):
     hours = seconds // 3600
     hours = seconds // 3600
     minutes = (seconds % 3600) // 60
     minutes = (seconds % 3600) // 60
@@ -53,7 +66,7 @@ def _monitor(level, msg: str, start, elapsed):
     if now.hour > 6:
     if now.hour > 6:
         msg = msg.replace("\\n", "\n").replace("\\t", "\t")
         msg = msg.replace("\\n", "\n").replace("\\t", "\t")
         mgs_text = f"- 当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}" \
         mgs_text = f"- 当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}" \
-                   f"\n- 任务开始时间: {start}" \
+                   f"\n- 任务开始时间: {timestamp_format(start)}" \
                    f"\n- 任务状态: {level_task_status_map[level]}" \
                    f"\n- 任务状态: {level_task_status_map[level]}" \
                    f"\n- 任务耗时: {seconds_convert(elapsed)}" \
                    f"\n- 任务耗时: {seconds_convert(elapsed)}" \
                    f"\n- 任务描述: {msg}"
                    f"\n- 任务描述: {msg}"