Kaynağa Gözat

feat:添加告警脚本

zhaohaipeng 9 ay önce
ebeveyn
işleme
c153af8f5e

+ 110 - 0
ad/07_ad_model_update_everyday.sh

@@ -0,0 +1,110 @@
+#!/bin/sh
+set -ex
+
+# 0 全局变量/参数
+originDataSavePath=/dw/recommend/model/31_ad_sample_data_auto/
+bucketFeatureSavePath=/dw/recommend/model/33_ad_train_data_nosparse_auto/
+model_name=model_lr0
+today="$(date +%Y%m%d)"
+today_early_1="$(date -d '1 days ago' +%Y%m%d)"
+beginTime=08
+endTime=23
+beginStr=${today_early_1}${beginTime}
+endStr=${today_early_1}${endTime}
+
+MODEL_PATH=/root/zhaohp/recommend-emr-dataprocess/model
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+FM_HOME=/root/sunmingze/alphaFM
+OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/ad_model/
+
+
+# 1 判断依赖的数据表是否生产完成
+source /root/anaconda3/bin/activate py37
+max_hour=15
+max_minute=00
+while true; do
+  python_return_code=$(python ad/ad_utils.py --excute_program check_ad_origin_hive --partition ${today_early_1} --hh ${endTime})
+  if [ $python_return_code -eq 0 ]; then
+    echo "Python程序返回0,退出循环。"
+    break
+  fi
+  echo "Python程序返回非0值,等待五分钟后再次调用。"
+  sleep 300
+  current_hour=$(date +%H)
+  current_minute=$(date +%M)
+  if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
+    echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
+    msg="广告特征数据校验失败,大数据分区没有数据: ${today_early_1}${endTime}"
+    /root/anaconda3/bin/python utils_monitor.py ${msg}
+    exit 1
+  fi
+done
+
+
+# 2 原始特征生成
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
+--master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+tablePart:64 repartition:32 \
+beginStr:${beginStr} endStr:${endStr} \
+savePath:${originDataSavePath} \
+table:alg_recsys_ad_sample_all_new
+if [ $? -ne 0 ]; then
+   echo "Spark原始样本生产任务执行失败"
+   msg="广告特征数据生成失败,Spark原始样本生产任务执行失败"
+   /root/anaconda3/bin/python utils_monitor.py ${msg}
+   exit 1
+else
+    echo "spark原始样本生产执行成功"
+fi
+
+
+# 3 特征分桶
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_33_bucketData_20240622 \
+--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+beginStr:${today_early_1} endStr:${today_early_1} repartition:400 \
+filterNames:XXXXX \
+bucketFileName:20240620_ad_bucket_249_fix.txt \
+readPath:${originDataSavePath} \
+savePath:${bucketFeatureSavePath}
+if [ $? -ne 0 ]; then
+   echo "Spark特征分桶处理任务执行失败"
+   msg="广告特征分桶失败,Spark特征分桶处理任务执行失败"
+   /root/anaconda3/bin/python utils_monitor.py ${msg}
+   exit 1
+else
+   echo "spark特征分桶处理执行成功"
+fi
+
+
+# 4 模型训练
+$HADOOP fs -text ${bucketFeatureSavePath}/${today_early_1}/* | /root/sunmingze/alphaFM/bin/fm_train -m model/${model_name}_${today_early_1}.txt -dim 1,1,0 -core 8
+if [ $? -ne 0 ]; then
+   echo "模型训练失败"
+   /root/anaconda3/bin/python utils_monitor.py "广告模型训练失败"
+   exit 1
+fi
+
+
+# 5 对比AUC
+
+
+# 6 模型格式转换
+cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
+| sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' \
+> ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
+
+
+# 7 模型文件上传OSS
+online_model_path=${OSS_PATH}/${model_name}.txt
+$HADOOP fs -test -e ${online_model_path}
+if [ $? -eq 0 ]; then
+    echo "数据存在, 先删除。"
+    $HADOOP fs -rm -r ${online_model_path}
+else
+    echo "数据不存在"
+fi
+$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}

+ 0 - 0
src/main/python/utils_monitor.py → ad/ad_monitor_util.py


+ 64 - 0
ad/ad_utils.py

@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+from odps import ODPS
+import argparse
+
+ODPS_CONFIG = {
+    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+    'ACCESSID': 'LTAIWYUujJAm7CbH',
+    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
+
+
+def check_data_hh(project, table, partition, hh) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=ODPS_CONFIG['ACCESSID'],
+        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        check_res = t.exist_partition(partition_spec=f'dt={partition},hh={hh}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        print("error:" + str(e))
+        data_count = 0
+    return data_count
+
+
+def check_ad_origin_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_ad_sample_all_new"
+    partition = args.partition
+    hh = args.hh
+    count = check_data_hh(project, table, partition, hh)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='脚本utils')
+    parser.add_argument('--excute_program', type=str, help='执行程序')
+    parser.add_argument('--partition', type=str, help='表分区')
+    parser.add_argument('--hh', type=str, help='小时级分区时的小时')
+    parser.add_argument('--project', type=str, help='表空间')
+    parser.add_argument('--table', type=str, help='表名')
+    args = parser.parse_args()
+    if args.excute_program == "check_ad_origin_hive":
+        check_ad_origin_hive(args)
+    else:
+        print("无合法参数,验证失败。")
+        exit(999)

+ 0 - 9
zhangbo/07_ad_model_update_everyday.sh

@@ -90,12 +90,3 @@ cat ${MODEL_PATH}/${model_name}_${today_early_1}.txt \
 > ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt
 
 # 7 模型文件上传OSS
-online_model_path=${OSS_PATH}/${model_name}.txt
-$HADOOP fs -test -e ${online_model_path}
-if [ $? -eq 0 ]; then
-    echo "数据存在, 先删除。"
-    $HADOOP fs -rm -r ${online_model_path}
-else
-    echo "数据不存在"
-fi
-$HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_1}_change.txt ${online_model_path}

+ 0 - 42
zhangbo/utils.py

@@ -35,32 +35,6 @@ def check_data(project, table, partition) -> int:
     return data_count
 
 
-def check_data_hh(project, table, partition, hh) -> int:
-    """检查数据是否准备好,输出数据条数"""
-    odps = ODPS(
-        access_id=ODPS_CONFIG['ACCESSID'],
-        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
-        project=project,
-        endpoint=ODPS_CONFIG['ENDPOINT'],
-        connect_timeout=3000,
-        read_timeout=500000,
-        pool_maxsize=1000,
-        pool_connections=1000
-    )
-    try:
-        t = odps.get_table(name=table)
-        check_res = t.exist_partition(partition_spec=f'dt={partition},hh={hh}')
-        if check_res:
-            sql = f'select * from {project}.{table} where dt = {partition}'
-            with odps.execute_sql(sql=sql).open_reader() as reader:
-                data_count = reader.count
-        else:
-            data_count = 0
-    except Exception as e:
-        print("error:" + str(e))
-        data_count = 0
-    return data_count
-
 def check_origin_hive(args):
     project = "loghubods"
     table = "alg_recsys_view_sample_v2"
@@ -104,24 +78,10 @@ def check_hive(args):
         print("0")
 
 
-def check_ad_origin_hive(args):
-    project = "loghubods"
-    table = "alg_recsys_ad_sample_all_new"
-    partition = args.partition
-    hh = args.hh
-    count = check_data_hh(project, table, partition, hh)
-    if count == 0:
-        print("1")
-        exit(1)
-    else:
-        print("0")
-
-
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(description='脚本utils')
     parser.add_argument('--excute_program', type=str, help='执行程序')
     parser.add_argument('--partition', type=str, help='表分区')
-    parser.add_argument('--hh', type=str, help='小时级分区时的小时')
     parser.add_argument('--project', type=str, help='表空间')
     parser.add_argument('--table', type=str, help='表名')
     args = parser.parse_args()
@@ -131,8 +91,6 @@ if __name__ == '__main__':
         check_item_hive(args)
     elif args.excute_program == "check_user_hive":
         check_user_hive(args)
-    elif args.excute_program == "check_ad_origin_hive":
-        check_ad_origin_hive(args)
     elif args.excute_program == "check_hive":
         check_hive(args)
     else: