Kaynağa Gözat

uv 裂变率

jch 1 gün önce
ebeveyn
işleme
df1b950c37

+ 6 - 0
ad_display/README.md

@@ -0,0 +1,6 @@
+# 1. make_data.sh  (生成特征数据)
+# 2. stat_freq.sh (统计特征频次,过滤低频特征)
+# 3. pipline/make_train_sample.sh(生成训练数据,使用过滤低频的特征文件)
+# 4. pipline/train_xgb_model.sh (训练模型)
+# 5. pipline/xgb_model_eval.sh (评估模型)
+# 6. pipline/eval_qq.sh (生成qq)

+ 38 - 0
ad_display/make_data.sh

@@ -0,0 +1,38 @@
+#!/bin/bash
+
+start_date=""
+end_date=""
+table=""
+if(($#==3))
+then
+    start_date=$1
+    end_date=$2
+    table=$3
+else
+    start_date=20251216
+    end_date=20251216
+    table=alg_ad_display_fission_rate_20251218
+fi
+
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+savePath=/dw/recommend/model/ad_display/data
+
+# 1 生产原始数据
+echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------开始根据${table}生产原始数据"
+set -x
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata_recsys_r_rate.makedata_display_ad_20251218 \
+--master yarn --driver-memory 4G --executor-memory 6G --executor-cores 1 --num-executors 8 \
+--conf spark.yarn.executor.memoryoverhead=2048 \
+/mnt/disk1/jch/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-jar-with-dependencies.jar \
+table:${table} \
+tablePart:64 \
+beginStr:${start_date} \
+endStr:${end_date} \
+repartition:32 \
+savePath:${savePath} \

+ 30 - 0
ad_display/pipline/eval_qq.sh

@@ -0,0 +1,30 @@
+#!/bin/sh
+
+# env
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+predictPath=/dw/recommend/model/ad_display/eval
+#bucketNum=200
+bucketNum=2000
+savePath=/dw/recommend/model/ad_display/qq
+
+echo `date` "stat qq"
+set -x
+/opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.stat_qq \
+--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 4 \
+--conf spark.yarn.executor.memoryoverhead=1024 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+--conf spark.debug.maxToStringFields=100 \
+/mnt/disk1/jch/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+predictPath:${predictPath} \
+bucketNum:${bucketNum} \
+savePath:${savePath} \

+ 49 - 0
ad_display/pipline/make_train_sample.sh

@@ -0,0 +1,49 @@
+#!/bin/bash
+
+feature_file=""
+bucket_file=""
+beginStr=""
+endStr=""
+if(($#==4))
+then
+    feature_file=$1
+    bucket_file=$2
+    beginStr=$3
+    endStr=$4
+else
+    exit -1
+fi
+
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+readPath=/dw/recommend/model/ad_display/data
+whatLabel=r1_uv
+fuSampleRate=0.12
+#fuSampleRate=0.6
+notUseBucket=1
+repartition=10
+savePath=/dw/recommend/model/ad_display/sample
+
+# 1 生产原始数据
+echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------开始根据${table}生产原始数据"
+set -x
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata_recsys_r_rate.makedata_display_ad_sample_20251218 \
+--master yarn --driver-memory 4G --executor-memory 6G --executor-cores 1 --num-executors 8 \
+--conf spark.yarn.executor.memoryoverhead=2048 \
+--files ${feature_file},${bucket_file} \
+/mnt/disk1/jch/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-jar-with-dependencies.jar \
+readPath:${readPath} \
+beginStr:${beginStr} \
+endStr:${endStr} \
+whatLabel:${whatLabel} \
+fuSampleRate:${fuSampleRate} \
+notUseBucket:${notUseBucket} \
+featureFile:${feature_file} \
+featureBucket:${bucket_file} \
+repartition:${repartition} \
+savePath:${savePath} \

+ 62 - 0
ad_display/pipline/train_xgb_model.sh

@@ -0,0 +1,62 @@
+#!/bin/sh
+
+start_date=""
+end_date=""
+feature_file=""
+if(($#==3))
+then
+    start_date=$1
+    end_date=$2
+    feature_file=$3
+else
+    exit -1
+fi
+
+# env
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+workers=16
+minCnt=0
+BASE_TRAIN_DATA_PATH=/dw/recommend/model/ad_display/sample
+MODEL_SAVE_PATH=/dw/recommend/model/ad_display/model/model_xgb
+
+# params
+train_data_path=""
+for((i=0; i<=21; i++))
+do
+  data_date=$(date -d "$start_date $i day" +"%Y%m%d")
+  if [ "$data_date" -le "$end_date" ]
+  then
+    one_day_data_path="${BASE_TRAIN_DATA_PATH}/${data_date}"
+    if [[ -z $train_data_path ]]
+    then
+        train_data_path=$one_day_data_path
+    else
+        train_data_path="$train_data_path,$one_day_data_path"
+    fi
+  fi
+done
+
+## ******* train *******
+set -x
+/opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.train_profile_gender_xgb_20251114 \
+--master yarn --driver-memory 6G --executor-memory 10G --executor-cores 1 --num-executors ${workers} \
+--conf spark.yarn.executor.memoryoverhead=2048 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+--conf spark.sql.debug.maxToStringFields=100 \
+--files ${feature_file} \
+/mnt/disk1/jch/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+trainPath:${train_data_path} \
+featureFile:${feature_file} \
+minCnt:${minCnt} \
+modelPath:${MODEL_SAVE_PATH} \
+eta:0.06 gamma:0.0 max_depth:4 num_round:1000 num_worker:${workers}

+ 64 - 0
ad_display/pipline/xgb_model_eval.sh

@@ -0,0 +1,64 @@
+#!/bin/sh
+
+start_date=""
+end_date=""
+feature_file=""
+if(($#==3))
+then
+    start_date=$1
+    end_date=$2
+    feature_file=$3
+else
+    exit -1
+fi
+
+# env
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+base_data_path=/dw/recommend/model/ad_display/sample
+model_path=/dw/recommend/model/ad_display/model/model_xgb
+minCnt=0
+repartition=10
+save_path=/dw/recommend/model/ad_display/eval
+
+# params
+test_data_path=""
+for((i=0; i<=21; i++))
+do
+  data_date=$(date -d "$start_date $i day" +"%Y%m%d")
+  if [ "$data_date" -le "$end_date" ]
+  then
+    one_day_data_path="${base_data_path}/${data_date}"
+    if [[ -z $test_data_path ]]
+    then
+        test_data_path=$one_day_data_path
+    else
+        test_data_path="$test_data_path,$one_day_data_path"
+    fi
+  fi
+done
+
+echo `date` "predict gender sample"
+set -x
+/opt/apps/SPARK3/spark-3.3.1-hadoop3.2-1.0.5/bin/spark-class org.apache.spark.deploy.SparkSubmit \
+--class com.tzld.piaoquan.recommend.model.pred_profile_gender_xgb_20251114 \
+--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 8 \
+--conf spark.yarn.executor.memoryoverhead=1024 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+--conf spark.debug.maxToStringFields=100 \
+--files ${feature_file} \
+/mnt/disk1/jch/recommend-model/recommend-model-produce/target/recommend-model-produce-jar-with-dependencies.jar \
+modelPath:${model_path} \
+testPath:${test_data_path} \
+featureFile:${feature_file} \
+minCnt:${minCnt} \
+savePath:${save_path} \
+repartition:${repartition} \

+ 53 - 0
ad_display/stat_freq.sh

@@ -0,0 +1,53 @@
+#!/bin/bash
+
+start_date=""
+end_date=""
+sub_path="feat_freq"
+if(($#==3))
+then
+    start_date=$1
+    end_date=$2
+    sub_path=$3
+else
+    exit -1
+fi
+
+export SPARK_HOME=/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8
+export PATH=$SPARK_HOME/bin:$PATH
+export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
+export JAVA_HOME=/usr/lib/jvm/java-1.8.0
+
+# params
+BASE_DATA_PATH=/dw/recommend/model/ad_display/data
+data_path=""
+for((i=0; i<=21; i++))
+do
+  data_date=$(date -d "$start_date $i day" +"%Y%m%d")
+  if [ "$data_date" -le "$end_date" ]
+  then
+    one_day_data_path="${BASE_DATA_PATH}/${data_date}"
+    if [[ -z $data_path ]]
+    then
+        data_path=$one_day_data_path
+    else
+        data_path="$data_path,$one_day_data_path"
+    fi
+  fi
+done
+
+featureIndex=2
+repartition=1
+savePath=/dw/recommend/model/832_recsys_analysis_data/${sub_path}
+
+# 1 生产原始数据
+echo "$(date +%Y-%m-%d_%H-%M-%S)----------step1------------开始根据${table}生产原始数据"
+set -x
+/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata_recsys_r_rate.stat_feature \
+--master yarn --driver-memory 4G --executor-memory 6G --executor-cores 1 --num-executors 16 \
+--conf spark.yarn.executor.memoryoverhead=2048 \
+/mnt/disk1/jch/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-jar-with-dependencies.jar \
+dataPath:${data_path} \
+featureIndex:${featureIndex} \
+repartition:${repartition} \
+savePath:${savePath} \

+ 68 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_display_ad_20251218.scala

@@ -0,0 +1,68 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.spark.examples.myUtils._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+
+object makedata_display_ad_20251218 {
+  private val o2oMap = Map(
+    "creative" -> "creativeInfo"
+  )
+
+  private def getFeature(dt: String, rdd: RDD[java.util.Map[String, String]]): RDD[String] = {
+    rdd.mapPartitions(partition => {
+        partition.map(raw => {
+          val mid = raw.getOrElse("mid", "")
+          val labels = raw.getOrElse("labels", "")
+          val features = AdDisplayConvert.getFeature(dt, raw, 6).toString
+          if (mid.nonEmpty && labels.nonEmpty && features.nonEmpty) {
+            mid + "\t" + labels + "\t" + features
+          } else {
+            ""
+          }
+        })
+      })
+      .filter(_.nonEmpty)
+  }
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1. 解析参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "alg_ad_display_ad_sample_20251218")
+    val beginStr = param.getOrElse("beginStr", "20251216")
+    val endStr = param.getOrElse("endStr", "20251216")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/ad_display/data")
+    val repartition = param.getOrElse("repartition", "64").toInt
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+
+    // 2. 数据处理
+    for (dt <- dateRange) {
+      // 2.1 分区
+      val partition = "dt=%s".format(dt)
+      println("开始执行partition:" + partition)
+
+      // 2.2 加载样本数据
+      val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
+        .map(record => {
+          OnlineLogUtils.log2Map(record, o2oMap)
+        })
+
+      // 2.3 特征转换
+      val featureData = getFeature(dt, odpsData)
+
+      // 2.4 保存数据
+      val hdfsPath = "%s/%s".format(savePath, dt)
+      DataUtils.saveData(featureData, hdfsPath, repartition)
+    }
+  }
+}

+ 123 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_display_ad_sample_20251218.scala

@@ -0,0 +1,123 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.spark.examples.myUtils.{DataUtils, MyDateUtils, MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+import scala.util.Random
+
+
+object makedata_display_ad_sample_20251218 {
+  def main(args: Array[String]): Unit = {
+    // 1. 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/ad_display/data")
+    val beginStr = param.getOrElse("beginStr", "20251216")
+    val endStr = param.getOrElse("endStr", "20251216")
+    val whatLabel = param.getOrElse("whatLabel", "r1_uv")
+    val fuSampleRate = param.getOrElse("fuSampleRate", "0.1").toDouble
+    val notUseBucket = param.getOrElse("notUseBucket", "1").toInt
+    val featureFile = param.getOrElse("featureFile", "20241209_recsys_nor_name.txt")
+    val featureBucketFile = param.getOrElse("featureBucket", "20241209_recsys_nor_bucket.txt")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/ad_display/sample")
+
+    // 2. content
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 3. 处理数据
+    val featureSet = loadFeatureNames(featureFile)
+    val featureBucketMap = loadUseFeatureBuckets(notUseBucket, featureBucketFile)
+    val bucketsMap_br = sc.broadcast(featureBucketMap)
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (dt <- dateRange) {
+      val partition = "%s".format(dt)
+      println("开始执行:" + partition)
+      val data = sc.textFile(readPath + "/" + partition + "*").map(row => {
+          val cells = row.split("\t")
+          val mid = cells(0)
+          val labels = cells(1)
+          val featData = cells(2)
+          (mid, labels, featData)
+        })
+        .filter {
+          case (mid, labels, featData) =>
+            val label = DataUtils.parseLabel(labels, whatLabel).toInt
+            label > 0 || new Random().nextDouble() <= fuSampleRate
+        }
+        .map {
+          case (mid, labels, featData) =>
+            val label = DataUtils.parseLabel(labels, whatLabel).toInt
+            val features = DataUtils.parseFeature(featData)
+            (mid, label, features)
+        }
+        .mapPartitions(row => {
+          val result = new ArrayBuffer[String]()
+          row.foreach {
+            case (mid, label, features) =>
+              val bucketsMap = bucketsMap_br.value
+              val featuresBucket = DataUtils.bucketFeature(featureSet, bucketsMap, features)
+              if (0 == label) {
+                result.add(mid + "\t" + 0 + "\t" + featuresBucket.mkString("\t"))
+              } else {
+                result.add(mid + "\t" + 0 + "\t" + featuresBucket.mkString("\t"))
+                for (_ <- 1 to label) {
+                  result.add(mid + "\t" + 1 + "\t" + featuresBucket.mkString("\t"))
+                }
+              }
+          }
+          result.iterator
+        })
+
+      // 4. 保存数据到hdfs
+      val hdfsPath = savePath + "/" + partition
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+
+  def loadFeatureNames(nameFile: String): Set[String] = {
+    val buffer = Source.fromFile(nameFile)
+    val names = buffer.getLines().mkString("\n")
+    buffer.close()
+    val featSet = names.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .toSet
+    println("featSet.size=" + featSet.size)
+    println(featSet)
+    featSet
+  }
+
+  def loadUseFeatureBuckets(notUseBucket: Int, bucketFile: String): Map[String, (Double, Array[Double])] = {
+    if (notUseBucket > 0) {
+      return Map[String, (Double, Array[Double])]()
+    }
+
+    val buffer = Source.fromFile(bucketFile)
+    val lines = buffer.getLines().mkString("\n")
+    buffer.close()
+    val bucketMap = lines.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .map(r => {
+        val rList = r.split("\t")
+        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
+      }).toMap
+    println("bucketMap.size=" + bucketMap.size)
+    println(bucketMap)
+    bucketMap
+  }
+}

+ 75 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/stat_feature.scala

@@ -0,0 +1,75 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+object stat_feature {
+  def main(args: Array[String]): Unit = {
+    // 1. 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val dataPath = param.getOrElse("dataPath", "/dw/recommend/model/user_profile/data/")
+    val featureIndex = param.getOrElse("featureIndex", "2").toInt
+    val repartition = param.getOrElse("repartition", "2").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/832_recsys_analysis_data")
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 2. 处理数据
+    val data = sc.textFile(dataPath)
+      .map(r => {
+        // logKey + "\t" + labelKey  + "\t" + featureKey
+        val rList = r.split("\t")
+        val featData = rList(featureIndex)
+        parseFeature(featData)
+      })
+      .flatMap(features => {
+        features.map {
+          case (key, value) =>
+            (key, 1)
+        }
+      })
+      .reduceByKey((a, b) => a + b)
+      .map(raw => {
+        raw._1 + "\t" + raw._2.toString
+      })
+
+    // 3. 保存数据到hdfs
+    val hdfsPath = savePath
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+
+  private def parseFeature(data: String): scala.collection.mutable.Map[String, Double] = {
+    val features = scala.collection.mutable.Map[String, Double]()
+    if (data.nonEmpty) {
+      val obj = JSON.parseObject(data)
+      obj.foreach(r => {
+        features.put(r._1, obj.getDoubleValue(r._1))
+      })
+    }
+    features
+  }
+
+  def getPathArray(basePath: String, year: String, suffixSet: Set[String]): ArrayBuffer[String] = {
+    val pathArray = new ArrayBuffer[String]
+    for (suffix <- suffixSet) {
+      val path = "%s/%s_%s/*".format(basePath, year, suffix)
+      pathArray += path
+    }
+    pathArray
+  }
+}

+ 24 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/AdDisplayConvert.java

@@ -0,0 +1,24 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AdDisplayConvert {
+    public static JSONObject getFeature(String dt, Map<String, String> record, int accurate) {
+        Map<String, Double> featMap = new HashMap<>();
+        try {
+            String dtKey = String.format("dt@%s", dt);
+            featMap.put(dtKey, 1.0);
+            Map<String, String> feature = ConvertUtils.getRecordCol(record, "feature");
+            for (Map.Entry<String, String> entry : feature.entrySet()) {
+                String key = String.format("%s@%s", entry.getKey(), entry.getValue());
+                featMap.put(key, 1.0);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return ConvertUtils.filterAndTruncate(featMap, accurate);
+    }
+}