Prechádzať zdrojové kódy

feat:添加推荐模型新模型训练脚本

zhaohaipeng 8 mesiacov pred
rodič
commit
16c6d8ea03

+ 31 - 18
recommend/02_train_go.sh

@@ -1,31 +1,44 @@
 #!/bin/sh
-set -ex
 
-start_date=$1
+# 训练新模型,并使用后面的数据计算AUC,评估模型效果
+
+set -x
+
+begin_date=$1
 end_date=$2
 model_name=$3
-MODEL_PATH="./model/"
-SAMPLE_PATH=$4
-bias=$5
-HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
-FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
+train_dim=$4
+hdfs_path=$5
+
+PROJECT_HOME=/root/zhaohp/recommend-emr-dataprocess/recommend/
+HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
+MODEL_PATH=${PROJECT_HOME}/model/recommend/
+PREDICT_PATH=${PROJECT_HOME}/predict/recommend/
+
+FM_TRAIN=/root/sunmingze/alphaFM/bin/fm_train
+
 
+train_date=$begin_date
 
+main() {
 
-#$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240713/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/${model_name}_all_20240713.txt -dim ${bias} -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_0709_0712.txt
+    # 增量训练模型
+    while [ "$train_date" != "$end_date" ]; do
+        echo "==================== 开始训练 $train_date 模型 ===================="
 
-current_date="$start_date"
+        if [ "$train_date" == "$begin_date" ]; then
+            $HADOOP fs -text ${hdfs_path}/${train_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${train_date}.txt -dim ${train_dim} -core 8
+        else
+            yesterday=$(date -d "$train_date -1 day" +%Y%m%d)
+            $HADOOP fs -text ${hdfs_path}/${train_date}/* | ${FM_TRAIN} -m ${MODEL_PATH}/${model_name}_${train_date}.txt -dim ${train_dim} -core 8 -im ${MODEL_PATH}/${model_name}_${yesterday}.txt
+        fi
 
-while [[ "$current_date" != "$end_date" ]]; do
-    echo -------"$current_date"----------
+        train_date=$(date -d "$train_date +1 day" +%Y%m%d)
 
-    yesterday=$(date -d "$current_date - 1 day" +%Y%m%d)
-    echo model-day-$yesterday
-    echo data-day-$current_date
-    $HADOOP fs -text /dw/recommend/model/43_recsys_train_data/$current_date/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/${model_name}_all_$current_date.txt -dim ${bias} -core 8 -im /root/joe/recommend-emr-dataprocess/model/${model_name}_all_$yesterday.txt
-    current_date=$(date -d "$current_date + 1 day" +%Y%m%d)
-done
+        echo -e "==================== 训练 $train_date 模型结束 ====================\n\n\n\n\n\n"
+    done
 
-# nohup sh 02_train_go.sh 20240714 20240715 model_nba8 /dw/recommend/model/43_recsys_train_data/ 1,1,8 > log/p2_model_bkb8_all.log 2>&1 &
+}
 
+main
 

+ 0 - 30
recommend/FeishuBot.py

@@ -1,30 +0,0 @@
-import requests
-import argparse
-
-
-class FeishuBot:
-    webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd"
-
-    def send_message(self, message_text):
-        headers = {'Content-Type': 'application/json'}
-        data = {
-            "msg_type": "text",
-            "content": {"text": message_text}
-        }
-        response = requests.post(self.webhook_url, json=data, headers=headers)
-        response.raise_for_status()  # 如果响应状态码不是200,则抛出HTTPError异常
-        return response.json()  # 返回JSON响应(如果需要的话)
-
-
-def main():
-    parser = argparse.ArgumentParser(description='Send a message via Feishu Bot')
-    parser.add_argument('message', type=str, help='The message to send to Feishu Bot')
-    args = parser.parse_args()
-
-    bot = FeishuBot()
-    response = bot.send_message(args.message)
-    print(response)  # 打印响应内容,以便查看发送结果
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 86
recommend/change_oss.sh

@@ -1,86 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-
-
-cat /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt |
-  awk -F " " '{
-      if (NR == 1) {
-          print $1"\t"$2
-      } else {
-          split($0, fields, " ");
-          OFS="\t";
-          line=""
-          for (i = 1; i <= 10 && i <= length(fields); i++) {
-              line = (line ? line "\t" : "") fields[i];
-          }
-              print line
-          }
-      }' > /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22_change.txt
-if [ $? -ne 0 ]; then
-   echo "新模型文件格式转换失败"
-fi
-# 4.1.2 模型文件上传OSS
-online_model_path=${OSS_PATH}/${model_name}.txt
-$HADOOP fs -test -e ${online_model_path}
-if [ $? -eq 0 ]; then
-    echo "数据存在, 先删除。"
-    $HADOOP fs -rm -r -skipTrash ${online_model_path}
-else
-    echo "数据不存在"
-fi
-$HADOOP fs -put /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22_change.txt ${online_model_path}
-if [ $? -eq 0 ]; then
-    echo "推荐模型文件至OSS成功"
-    # 4.1.3 本地保存最新的线上使用的模型,用于下一次的AUC验证
-    cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt
-    cp -f /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt ${LAST_MODEL_HOME}/model_online.txt
-    if [ $? -ne 0 ]; then
-        echo "模型备份失败"
-    fi
-    /root/anaconda3/bin/python monitor_util.py --level info --msg "荐模型数据更新 \n【任务名称】:step模型更新\n【是否成功】:success\n【信息】:已更新/root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt模型}"
-else
-    echo "推荐模型文件至OSS失败"
-    /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step模型推送oss\n【是否成功】:error\n【信息】:推荐模型文件至OSS失败/root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt --- ${online_model_path}"
-fi

+ 0 - 131
recommend/checkHiveDataUtil.py

@@ -1,131 +0,0 @@
-# -*- coding: utf-8 -*-
-from odps import ODPS
-from FeishuBot import FeishuBot
-
-import argparse
-
-ODPS_CONFIG = {
-    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
-    'ACCESSID': 'LTAIWYUujJAm7CbH',
-    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
-}
-
-
-def check_origin_hive(args):
-    project = "loghubods"
-    table = args.table
-    beginStr = args.beginStr
-    endStr = args.endStr
-    # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理
-    # 如果全都为空报警
-    time_sequence = generate_time_sequence(beginStr, endStr)
-    # exist_partition = []
-    for time_str in time_sequence:
-        result = split_date_time(time_str)
-        partitionDt = result[0]
-        partitionHh = result[1]
-        count = check_data(project, table, partitionDt, partitionHh)
-        if count == 0:
-            bot = FeishuBot()
-            # msg = (
-            #     f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
-            msg = (
-                f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:error\n【信息】:table:{table},{time_str}分区数据不存在,继续检查')
-            bot.send_message(msg)
-            print('1')
-            exit(1)
-        else:
-            continue
-    print('0')
-        # exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')
-# if len(exist_partition) == 0:
-#     print('1')
-#     exit(1)
-# else:
-#     bot = FeishuBot()
-#     msg = (
-#         f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
-#     bot.send_message(msg)
-# print('0')
-
-
-def check_data(project, table, partitionDt, partitionDtHh) -> int:
-    """检查数据是否准备好,输出数据条数"""
-    odps = ODPS(
-        access_id=ODPS_CONFIG['ACCESSID'],
-        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
-        project=project,
-        endpoint=ODPS_CONFIG['ENDPOINT'],
-        # connect_timeout=300000,
-        # read_timeout=500000,
-        # pool_maxsize=1000,
-        # pool_connections=1000
-    )
-    try:
-        t = odps.get_table(name=table)
-        # check_res = t.exist_partition(partition_spec=f'dt={partition}')
-        # 含有hh分区
-        # if not {partitionDtHh}:
-        check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
-        if check_res:
-            sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
-            with odps.execute_sql(sql=sql).open_reader() as reader:
-                data_count = reader.count
-        else:
-            data_count = 0
-        # else:
-        #     check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
-        #     if check_res:
-        #         sql = f'select * from {project}.{table} where dt = {partitionDt}'
-        #         with odps.execute_sql(sql=sql).open_reader() as reader:
-        #             data_count = reader.count
-        #     else:
-        #         data_count = 0
-    except Exception as e:
-        print("error:" + str(e))
-        data_count = 0
-    return data_count
-
-
-def generate_time_sequence(beginStr, endStr):
-    # 将字符串时间转换为datetime对象
-    from datetime import datetime, timedelta
-
-    # 定义时间格式
-    time_format = "%Y%m%d%H"
-
-    # 转换字符串为datetime对象
-    begin_time = datetime.strptime(beginStr, time_format)
-    end_time = datetime.strptime(endStr, time_format)
-
-    # 生成时间序列
-    time_sequence = []
-    current_time = begin_time
-    while current_time <= end_time:
-        # 将datetime对象转换回指定格式的字符串
-        time_sequence.append(current_time.strftime(time_format))
-        # 增加一个小时
-        current_time += timedelta(hours=1)
-
-    return time_sequence
-
-
-def split_date_time(date_time_str):
-    # 假设date_time_str是一个长度为12的字符串,格式为YYYYMMDDHH
-    # 切片获取日期部分(前8位)和时间部分(后4位中的前2位,因为后两位可能是分钟或秒,但这里只取小时)
-    date_part = date_time_str[:8]
-    time_part = date_time_str[8:10]  # 只取小时部分
-
-    # 将结果存储在一个数组中(在Python中通常使用列表)
-    result = [date_part, time_part]
-
-    return result
-
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='脚本utils')
-    parser.add_argument('--beginStr', type=str, help='表分区Dt,beginStr')
-    parser.add_argument('--endStr', type=str, help='表分区Hh,endStr')
-    parser.add_argument('--table', type=str, help='表名')
-    argv = parser.parse_args()
-    check_origin_hive(argv)

+ 0 - 119
recommend/check_auc.sh

@@ -1,119 +0,0 @@
-#!/bin/sh
-set -ex
-
-source /root/anaconda3/bin/activate py37
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-#today="$(date +%Y%m%d)"
-today=20240710
-#today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-today_early_3=20240703
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/13_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/16_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=akaqjl8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-
-
-
-# 0 对比AUC 前置对比2日模型数据 与 线上模型数据效果对比,如果2日模型优于线上,更新线上模型
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step4------------开始对比,新:${MODEL_PATH}/${model_name}_20240703.txt,与线上online模型数据auc效果"
-#$HADOOP fs -text ${bucketDataPath}/20240707/* | ${FM_HOME}/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_online.txt
-#if [ $? -ne 0 ]; then
-#  echo "推荐线上模型AUC计算失败"
-#  /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐线上模型AUC计算失败"
-#else
-##  $HADOOP fs -text ${bucketDataPath}/20240707/* | ${FM_HOME}/fm_predict -m ${MODEL_PATH}/${model_name}_20240703.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_new.txt
-#  if [ $? -ne 0 ]; then
-#     echo "推荐新模型AUC计算失败"
-#     /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐新模型AUC计算失败${PREDICT_PATH}/${model_name}_${today}_new.txt"
-#  else
-    online_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_online.txt | /root/sunmingze/AUC/AUC`
-    if [ $? -ne 0 ]; then
-       echo "推荐线上模型AUC计算失败"
-       /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐线上模型AUC计算失败"
-    else
-      new_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_new.txt | /root/sunmingze/AUC/AUC`
-      if [ $? -ne 0 ]; then
-         echo "推荐新模型AUC计算失败"
-         /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐新模型AUC计算失败${PREDICT_PATH}/${model_name}_${today}_new.txt"
-      else
-        # 4.1 对比auc数据判断是否更新线上模型
-        if [ "$online_auc" \< "$new_auc" ]; then
-            echo "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
-            # 4.1.1 模型格式转换
-            cat ${MODEL_PATH}/${model_name}_20240703.txt |
-            awk -F " " '{
-                if (NR == 1) {
-                    print $1"\t"$2
-                } else {
-                    split($0, fields, " ");
-                    OFS="\t";
-                    line=""
-                    for (i = 1; i <= 10 && i <= length(fields); i++) {
-                        line = (line ? line "\t" : "") fields[i];
-                    }
-                    print line
-                }
-            }' > ${MODEL_PATH}/${model_name}_20240703_change.txt
-            if [ $? -ne 0 ]; then
-               echo "新模型文件格式转换失败"
-               /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4模型格式转换\n【是否成功】:error\n【信息】:新模型文件格式转换失败${MODEL_PATH}/${model_name}_20240703.txt"
-            else
-#              # 4.1.2 模型文件上传OSS
-#              online_model_path=${OSS_PATH}/${model_name}.txt
-#              $HADOOP fs -test -e ${online_model_path}
-#              if [ $? -eq 0 ]; then
-#                  echo "数据存在, 先删除。"
-#                  $HADOOP fs -rm -r -skipTrash ${online_model_path}
-#              else
-#                  echo "数据不存在"
-#              fi
-#              $HADOOP fs -put ${MODEL_PATH}/${model_name}_20240703_change.txt ${online_model_path}
-#              if [ $? -eq 0 ]; then
-#                 echo "推荐模型文件至OSS成功"
-#                  # 4.1.3 本地保存最新的线上使用的模型,用于下一次的AUC验证
-                 cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt
-                 cp -f ${MODEL_PATH}/${model_name}_20240703.txt ${LAST_MODEL_HOME}/model_online.txt
-                 if [ $? -ne 0 ]; then
-                     echo "模型备份失败"
-                 fi
-                 /root/anaconda3/bin/python monitor_util.py --level info --msg "荐模型数据更新 \n【任务名称】:step4模型更新\n【是否成功】:success\n【信息】:新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc},已更新${model_name}_20240703.txt模型}"
-#              else
-#                 echo "推荐模型文件至OSS失败"
-#                 /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4模型推送oss\n【是否成功】:error\n【信息】:推荐模型文件至OSS失败${MODEL_PATH}/${model_name}_20240703_change.txt --- ${online_model_path}"
-#              fi
-            fi
-        else
-            echo "新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
-            /root/anaconda3/bin/python monitor_util.py --level info --msg "荐模型数据更新 \n【任务名称】:step4模型更新\n【是否成功】:success\n【信息】:新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}}"
-        fi
-      fi
-    fi
-#  fi
-#fi

+ 0 - 133
recommend/data_0729.sh

@@ -1,133 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-#OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/qiaojialiang/
-
-
-
-
-# 1 生产原始数据
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------开始根据${table}生产原始数据"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
-#--master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#tablePart:64 repartition:32 \
-#beginStr:${begin_early_2_Str}${beginHhStr} endStr:${end_early_2_Str}${endHhStr} \
-#savePath:${originDataPath} \
-#table:${table}
-#if [ $? -ne 0 ]; then
-#   echo "Spark原始样本生产任务执行失败"
-#   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step1根据${table}生产原始数据\n【是否成功】:error\n【信息】:Spark原始样本生产任务执行失败"
-#   exit 1
-#else
-#   echo "spark原始样本生产执行成功"
-#fi
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:2024072709 endStr:2024072709 \
-savePath:${originDataPath} \
-table:${table}
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------spark原始样本生产执行成功:2024072709"
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:2024072713 endStr:2024072715 \
-savePath:${originDataPath} \
-table:${table} &
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:2024072718 endStr:2024072723 \
-savePath:${originDataPath} \
-table:${table} &
-
-wait
-if [ $? -ne 0 ]; then
-   echo "Spark原始样本生产任务执行失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step1根据${table}生产原始数据\n【是否成功】:error\n【信息】:Spark原始样本生产任务执行失败"
-   exit 1
-else
-   echo "spark原始样本生产执行成功"
-fi
-
-# 2 特征分桶
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据"
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:${originDataPath} \
-savePath:${bucketDataPath} \
-beginStr:20240727 endStr:20240727 repartition:500 \
-filterNames:XXXXXXXXX \
-fileName:20240609_bucket_314.txt \
-whatLabel:is_return whatApps:0,4,21,17
-if [ $? -ne 0 ]; then
-   echo "Spark特征分桶处理任务执行失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step3训练数据产出\n【是否成功】:error\n【信息】:Spark特征分桶处理任务执行失败"
-   exit 1
-else
-   echo "spark特征分桶处理执行成功"
-fi
-
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step4------------开始模型训练,增量训练:${MODEL_PATH}/${model_name}_${today_early_3}.txt"
-#$HADOOP fs -text ${bucketDataPath}/${begin_early_2_Str}/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_${begin_early_2_Str}.txt -dim 1,1,8 -im ${LAST_MODEL_HOME}/model_online.txt -core 8
-$HADOOP fs -text ${bucketDataPath}/20240727/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_20240727.txt -dim 1,1,8 -im /root/joe/model_online/model_online.txt -core 8
-if [ $? -ne 0 ]; then
-   echo "模型训练失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step5模型训练\n【是否成功】:error\n【信息】:${bucketDataPath}/20240727训练失败"
-fi
-
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step5------------模型训练完成:${MODEL_PATH}/${model_name}_20240727.txt"
-

+ 0 - 45
recommend/delPredictFile.sh

@@ -1,45 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-today="$(date -d '3 days ago' +%Y%m%d)"
-model_name=model_nba8
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-fileName1=${PREDICT_PATH}/${model_name}_${today}_online.txt
-fileName2=${PREDICT_PATH}/${model_name}_${today}_new.txt
-
-
-if [ -f "${fileName1}" ]; then
-    echo "文件 ${fileName1} 存在,正在删除..."
-    # 使用rm命令删除文件
-    rm "${fileName1}"
-#    mv "${fileName1}"  ${PREDICT_PATH}/xxx_online.txt
-    if [ $? -eq 0 ]; then
-        echo "文件 ${fileName1} 已成功删除。"
-    else
-        echo "删除文件 ${fileName1} 时出错。"
-    fi
-else
-    echo "文件 ${fileName1} 不存在。"
-fi
-
-
-if [ -f "${fileName2}" ]; then
-    echo "文件 ${fileName2} 存在,正在删除..."
-    # 使用rm命令删除文件
-    rm "${fileName2}"
-#    mv "${fileName2}"  ${PREDICT_PATH}/xxx_new.txt
-    if [ $? -eq 0 ]; then
-        echo "文件 ${fileName2} 已成功删除。"
-    else
-        echo "删除文件 ${fileName2} 时出错。"
-    fi
-else
-    echo "文件 ${fileName2} 不存在。"
-fi

+ 0 - 65
recommend/handle_0724.sh

@@ -1,65 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-
-
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据"
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:${originDataPath} \
-savePath:${bucketDataPath} \
-beginStr:20240716 endStr:20240722 repartition:500 \
-filterNames:XXXXXXXXX \
-fileName:20240609_bucket_314.txt \
-whatLabel:is_return whatApps:0,4,21,17
-if [ $? -ne 0 ]; then
-   echo "Spark特征分桶处理任务执行失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step3训练数据产出\n【是否成功】:error\n【信息】:Spark特征分桶处理任务执行失败"
-   exit 1
-else
-   echo "spark特征分桶处理执行成功"
-fi
-

+ 0 - 233
recommend/handle_rov.sh

@@ -1,233 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-#OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/qiaojialiang/
-
-# 0 判断上游表是否生产完成,最长等待到max_hour点
-# shellcheck disable=SC2154
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step0------------开始校验是否生产完数据,分区信息:beginStr:${begin_early_2_Str}${beginHhStr},endStr:${end_early_2_Str}${endHhStr}"
-while true; do
-  python_return_code=$(python /root/joe/recommend-emr-dataprocess/qiaojialiang/checkHiveDataUtil.py --table ${table} --beginStr ${begin_early_2_Str}${beginHhStr} --endStr ${end_early_2_Str}${endHhStr})
-  echo "python 返回值:${python_return_code}"
-  if [ $python_return_code -eq 0 ]; then
-    echo "Python程序返回0,校验存在数据,退出循环。"
-    break
-  fi
-  echo "Python程序返回非0值,不存在数据,等待五分钟后再次调用。"
-  sleep 300
-  current_hour=$(date +%H)
-  current_minute=$(date +%M)
-  # shellcheck disable=SC2039
-  if (( current_hour > max_hour || (current_hour == max_hour && current_minute >= max_minute) )); then
-    echo "最长等待时间已到,失败:${current_hour}-${current_minute}"
-    /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step0校验是否生产完数据\n【是否成功】:error\n【信息】:最长等待时间已到,失败:${current_hour}-${current_minute}"
-    exit 1
-  fi
-done
-
-# 1 生产原始数据
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------开始根据${table}生产原始数据"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
-#--master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#tablePart:64 repartition:32 \
-#beginStr:${begin_early_2_Str}${beginHhStr} endStr:${end_early_2_Str}${endHhStr} \
-#savePath:${originDataPath} \
-#table:${table}
-#if [ $? -ne 0 ]; then
-#   echo "Spark原始样本生产任务执行失败"
-#   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step1根据${table}生产原始数据\n【是否成功】:error\n【信息】:Spark原始样本生产任务执行失败"
-#   exit 1
-#else
-#   echo "spark原始样本生产执行成功"
-#fi
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${begin_early_2_Str}00 endStr:${end_early_2_Str}09 \
-savePath:${originDataPath} \
-table:${table} &
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${begin_early_2_Str}10 endStr:${end_early_2_Str}15 \
-savePath:${originDataPath} \
-table:${table} &
-
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:32 \
-beginStr:${begin_early_2_Str}16 endStr:${end_early_2_Str}23 \
-savePath:${originDataPath} \
-table:${table} &
-
-wait
-if [ $? -ne 0 ]; then
-   echo "Spark原始样本生产任务执行失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step1根据${table}生产原始数据\n【是否成功】:error\n【信息】:Spark原始样本生产任务执行失败"
-   exit 1
-else
-   echo "spark原始样本生产执行成功"
-fi
-
-# 2 特征分桶
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据"
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:${originDataPath} \
-savePath:${bucketDataPath} \
-beginStr:${begin_early_2_Str} endStr:${end_early_2_Str} repartition:500 \
-filterNames:XXXXXXXXX \
-fileName:20240609_bucket_314.txt \
-whatLabel:is_return whatApps:0,4,21,17
-if [ $? -ne 0 ]; then
-   echo "Spark特征分桶处理任务执行失败"
-   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step3训练数据产出\n【是否成功】:error\n【信息】:Spark特征分桶处理任务执行失败"
-   exit 1
-else
-   echo "spark特征分桶处理执行成功"
-fi
-echo "$(date +%Y-%m-%d_%H-%M-%S)----------step5------------spark特征分桶处理执行成功:${begin_early_2_Str}"
-
-# 3 对比AUC 前置对比3日模型数据 与 线上模型数据效果对比,如果3日模型优于线上,更新线上模型
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step3------------开始对比,新:${MODEL_PATH}/${model_name}_${today_early_3}.txt,与线上online模型数据auc效果"
-#$HADOOP fs -text ${bucketDataPath}/${begin_early_2_Str}/* | ${FM_HOME}/fm_predict -m ${LAST_MODEL_HOME}/model_online.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_online.txt
-#if [ $? -ne 0 ]; then
-#  echo "推荐线上模型AUC计算失败"
-#  /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐线上模型AUC计算失败"
-#else
-#  $HADOOP fs -text ${bucketDataPath}/${begin_early_2_Str}/* | ${FM_HOME}/fm_predict -m ${MODEL_PATH}/${model_name}_${today_early_3}.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_${today}_new.txt
-#  if [ $? -ne 0 ]; then
-#     echo "推荐新模型AUC计算失败"
-#     /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐新模型AUC计算失败${PREDICT_PATH}/${model_name}_${today}_new.txt"
-#  else
-#    online_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_online.txt | /root/sunmingze/AUC/AUC`
-#    if [ $? -ne 0 ]; then
-#       echo "推荐线上模型AUC计算失败"
-#       /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐线上模型AUC计算失败"
-#    else
-#      new_auc=`cat ${PREDICT_PATH}/${model_name}_${today}_new.txt | /root/sunmingze/AUC/AUC`
-#      if [ $? -ne 0 ]; then
-#         echo "推荐新模型AUC计算失败"
-#         /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4新旧模型AUC对比\n【是否成功】:error\n【信息】:推荐新模型AUC计算失败${PREDICT_PATH}/${model_name}_${today}_new.txt"
-#      else
-#        # 4.1 对比auc数据判断是否更新线上模型
-#        if [ "$online_auc" \< "$new_auc" ]; then
-#            echo "新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
-#            # 4.1.1 模型格式转换
-#            cat ${MODEL_PATH}/${model_name}_${today_early_3}.txt |
-#            awk -F " " '{
-#                if (NR == 1) {
-#                    print $1"\t"$2
-#                } else {
-#                    split($0, fields, " ");
-#                    OFS="\t";
-#                    line=""
-#                    for (i = 1; i <= 10 && i <= length(fields); i++) {
-#                        line = (line ? line "\t" : "") fields[i];
-#                    }
-#                    print line
-#                }
-#            }' > ${MODEL_PATH}/${model_name}_${today_early_3}_change.txt
-#            if [ $? -ne 0 ]; then
-#               echo "新模型文件格式转换失败"
-#               /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4模型格式转换\n【是否成功】:error\n【信息】:新模型文件格式转换失败${MODEL_PATH}/${model_name}_${today_early_3}.txt"
-#            else
-#              # 4.1.2 模型文件上传OSS
-#              online_model_path=${OSS_PATH}/${model_name}.txt
-#              $HADOOP fs -test -e ${online_model_path}
-#              if [ $? -eq 0 ]; then
-#                  echo "数据存在, 先删除。"
-#                  $HADOOP fs -rm -r -skipTrash ${online_model_path}
-#              else
-#                  echo "数据不存在"
-#              fi
-#              $HADOOP fs -put ${MODEL_PATH}/${model_name}_${today_early_3}_change.txt ${online_model_path}
-#              if [ $? -eq 0 ]; then
-#                 echo "推荐模型文件至OSS成功"
-#                  # 4.1.3 本地保存最新的线上使用的模型,用于下一次的AUC验证
-#                 cp -f ${LAST_MODEL_HOME}/model_online.txt ${LAST_MODEL_HOME}/model_online_$(date +\%Y\%m\%d).txt
-#                 cp -f ${MODEL_PATH}/${model_name}_${today_early_3}.txt ${LAST_MODEL_HOME}/model_online.txt
-#                 if [ $? -ne 0 ]; then
-#                     echo "模型备份失败"
-#                 fi
-#                 /root/anaconda3/bin/python monitor_util.py --level info --msg "荐模型数据更新 \n【任务名称】:step4模型更新\n【是否成功】:success\n【信息】:新模型优于线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc},已更新${model_name}_${today_early_3}.txt模型}"
-#              else
-#                 echo "推荐模型文件至OSS失败"
-#                 /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step4模型推送oss\n【是否成功】:error\n【信息】:推荐模型文件至OSS失败${MODEL_PATH}/${model_name}_${today_early_3}_change.txt --- ${online_model_path}"
-#              fi
-#            fi
-#        else
-#            echo "新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc}"
-#            /root/anaconda3/bin/python monitor_util.py --level info --msg "荐模型数据更新 \n【任务名称】:step4模型更新\n【是否成功】:success\n【信息】:新模型不如线上模型: 线上模型AUC: ${online_auc}, 新模型AUC: ${new_auc},${MODEL_PATH}/${model_name}_${today_early_3}.txt"
-#        fi
-#      fi
-#    fi
-#  fi
-#fi
-
-# 4 模型训练
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step4------------开始模型训练,增量训练:${MODEL_PATH}/${model_name}_${today_early_3}.txt"
-##$HADOOP fs -text ${bucketDataPath}/${begin_early_2_Str}/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_${begin_early_2_Str}.txt -dim 1,1,8 -im ${LAST_MODEL_HOME}/model_online.txt -core 8
-#$HADOOP fs -text ${bucketDataPath}/${begin_early_2_Str}/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_${begin_early_2_Str}.txt -dim 1,1,8 -im ${MODEL_PATH}/${model_name}_${today_early_3}.txt -core 8
-#if [ $? -ne 0 ]; then
-#   echo "模型训练失败"
-#   /root/anaconda3/bin/python monitor_util.py --level error --msg "荐模型数据更新 \n【任务名称】:step5模型训练\n【是否成功】:error\n【信息】:${bucketDataPath}/${begin_early_2_Str}训练失败"
-#fi
-
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step5------------模型训练完成:${MODEL_PATH}/${model_name}_${begin_early_2_Str}.txt"
-
-
-
-

+ 0 - 104
recommend/monitor_util.py

@@ -1,104 +0,0 @@
-# -*- coding: utf-8 -*-
-import argparse
-import datetime
-import json
-
-import requests
-
-server_robot = {
-    'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd',
-}
-
-level_header_template_map = {
-    "info": "turquoise",
-    "error": "red",
-    "warn": "yellow"
-}
-
-level_header_title_content_map = {
-    "info": "推荐模型自动更新通知",
-    "error": "推荐模型自动更新告警",
-    "warn": "推荐模型自动更新告警"
-}
-
-
-def send_card_msg_to_feishu(webhook, card_json):
-    """发送消息到飞书"""
-    headers = {'Content-Type': 'application/json'}
-    payload_message = {
-        "msg_type": "interactive",
-        "card": card_json
-    }
-    print(f"推送飞书消息内容: {json.dumps(payload_message)}")
-    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
-    print(response.text)
-
-
-def seconds_convert(seconds):
-    hours = seconds // 3600
-    minutes = (seconds % 3600) // 60
-    seconds = seconds % 60
-    return f"{hours}小时 {minutes}分钟 {seconds}秒"
-
-
-def _monitor(level, msg: str, start, elapsed):
-    """消息推送"""
-    now = datetime.datetime.now()
-    msg = msg.replace("\\n", "\n").replace("\\t", "\t")
-    mgs_text = f"- 当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}" \
-               f"\n- 任务描述: {msg}"
-    # f"\n- 任务开始时间: {start}" \
-    # f"\n- 任务耗时: {seconds_convert(elapsed)}" \
-    card_json = {
-        "config": {},
-        "i18n_elements": {
-            "zh_cn": [
-                {
-                    "tag": "markdown",
-                    "content": "",
-                    "text_align": "left",
-                    "text_size": "normal"
-                },
-                {
-                    "tag": "markdown",
-                    "content": mgs_text,
-                    "text_align": "left",
-                    "text_size": "normal"
-                }
-            ]
-        },
-        "i18n_header": {
-            "zh_cn": {
-                "title": {
-                    "tag": "plain_text",
-                    "content": level_header_title_content_map[level]
-                },
-                "subtitle": {
-                    "tag": "plain_text",
-                    "content": ""
-                },
-                "template": level_header_template_map[level]
-            }
-        }
-    }
-
-    send_card_msg_to_feishu(
-        webhook=server_robot.get('webhook'),
-        card_json=card_json
-    )
-
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='告警Utils')
-    parser.add_argument('--level', type=str, help='通知级别, info, warn, error', required=True)
-    parser.add_argument('--msg', type=str, help='消息', required=True)
-    # parser.add_argument('--start', type=str, help='任务开始时间', required=True)
-    # parser.add_argument('--elapsed', type=int, help='任务耗时【秒】', required=True)
-    args = parser.parse_args()
-
-    _monitor(
-        level=args.level,
-        msg=args.msg,
-        start="",
-        elapsed=0
-    )

+ 0 - 53
recommend/xunlian.sh

@@ -1,53 +0,0 @@
-#!/bin/sh
-set -x
-
-source /root/anaconda3/bin/activate py37
-
-export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
-export PATH=$SPARK_HOME/bin:$PATH
-export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
-export JAVA_HOME=/usr/lib/jvm/java-1.8.0
-
-#  nohup sh handle_rov.sh > "$(date +%Y%m%d_%H%M%S)_handle_rov.log" 2>&1 &
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-
-
-$HADOOP fs -text ${bucketDataPath}/20240713/* | ${FM_HOME}/fm_train -m ${MODEL_PATH}/${model_name}_0709_0713.txt -dim 1,1,8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_0709_0712.txt -core 8
-
-$HADOOP fs -text ${bucketDataPath}/20240714/* | ${FM_HOME}/fm_predict -m ${MODEL_PATH}/${model_name}_0709_0713.txt -dim 8 -core 8 -out ${PREDICT_PATH}/${model_name}_0709_0713_new.txt
-
-new_auc=`cat ${PREDICT_PATH}/${model_name}_0709_0713_new.txt | /root/sunmingze/AUC/AUC`
-
-echo "0709-0713 auc:${new_auc}"

+ 0 - 130
recommend/xunlian_0724.sh

@@ -1,130 +0,0 @@
-#!/bin/sh
-set -ex
-
-
-# 原始数据table name
-table='alg_recsys_sample_all'
-today="$(date +%Y%m%d)"
-today_early_3="$(date -d '3 days ago' +%Y%m%d)"
-#table='alg_recsys_sample_all_test'
-# 处理分区配置 推荐数据间隔一天生产,所以5日0点使用3日0-23点数据生产new模型数据
-begin_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-end_early_2_Str="$(date -d '2 days ago' +%Y%m%d)"
-beginHhStr=00
-endHhStr=23
-max_hour=05
-max_minute=00
-# 各节点产出hdfs文件绝对路径
-# 源数据文件
-originDataPath=/dw/recommend/model/41_recsys_sample_data/
-# 特征值
-valueDataPath=/dw/recommend/model/14_feature_data/
-# 特征分桶
-bucketDataPath=/dw/recommend/model/43_recsys_train_data/
-# 模型数据路径
-MODEL_PATH=/root/joe/recommend-emr-dataprocess/model
-# 预测路径
-PREDICT_PATH=/root/joe/recommend-emr-dataprocess/predict
-# 历史线上正在使用的模型数据路径
-LAST_MODEL_HOME=/root/joe/model_online
-# 模型数据文件前缀
-model_name=model_nba8
-# fm模型
-FM_HOME=/root/sunmingze/alphaFM/bin
-# hadoop
-HADOOP=/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop
-OSS_PATH=oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/
-FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
-
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240717"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240717 endStr:20240717 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-#
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240718"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240718 endStr:20240718 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-#
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240719"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240719 endStr:20240719 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-#
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240720"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240720 endStr:20240720 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-#
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240721"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240721 endStr:20240721 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-#
-#echo "$(date +%Y-%m-%d_%H-%M-%S)----------step2------------根据特征分桶生产重打分特征数据20240722"
-#/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
-#--class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_43_bucketData_20240709 \
-#--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-#../target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-#readPath:${originDataPath} \
-#savePath:${bucketDataPath} \
-#beginStr:20240722 endStr:20240722 repartition:500 \
-#filterNames:XXXXXXXXX \
-#fileName:20240609_bucket_314.txt \
-#whatLabel:is_return whatApps:0,4,21,17
-
-
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240717/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_17.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_20240716.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_17.txt'
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240718/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_18.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_17.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_18.txt'
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240719/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_19.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_18.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_19.txt'
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240720/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_20.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_19.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_20.txt'
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240721/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_21.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_20.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_21.txt'
-
-$HADOOP fs -text /dw/recommend/model/43_recsys_train_data/20240722/* | ${FM_TRAIN} -m /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt -dim 1,1,8 -core 8 -im /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_21.txt
-echo '$(date +%Y-%m-%d_%H-%M-%S)----step------out model 0709~0722: /root/joe/recommend-emr-dataprocess/model/model_nba8_all_9_22.txt'
-