Ver Fonte

检查数据 allfeaturemap

zhangbo há 10 meses atrás
pai
commit
b733c41fd0

+ 108 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_17_bucketDataPrint_20240617.scala

@@ -0,0 +1,108 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.alibaba.fastjson.{JSON, JSONObject}
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import examples.extractor.RankExtractorFeature_20240530
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+import org.xm.Similarity
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+/*
+   20240608 提取特征
+ */
+
+object makedata_17_bucketDataPrint_20240617 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2024061500")
+    val endStr = param.getOrElse("endStr", "2024061523")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/16_train_data_print_online/")
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "alg_recsys_sample_all_new")
+    val repartition = param.getOrElse("repartition", "32").toInt
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+
+    // 3 循环执行数据生产
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = s"dt=$dt,hh=$hh"
+      println("开始执行partiton:" + partition)
+      val odpsData = odpsOps.readTable(project = project,
+        table = table,
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart)
+        .map(record_ => {
+          val record = if (record_.isNull("allfeaturemap")) new JSONObject() else
+            JSON.parseObject(record_.getString("allfeaturemap"))
+          val featuresStr = record.toMap.map(r=> r._1 + ":" + r._2.toString).toSeq
+          val apptype = record_.getString("apptype")
+          val pagesource = record_.getString("pagesource")
+          val abcode = record_.getString("abcode")
+          val level = if (record_.isNull("level")) "0" else record_.getString("level")
+          val label = record_.getString("is_return")
+          (apptype, pagesource, level, label, abcode, featuresStr)
+        }).filter{
+          case (apptype, pagesource, level, label, abcode, featuresStr) =>
+            apptype.equals("3") && pagesource.endsWith("recommend") &&
+            Set("ab0", "ab1", "ab2", "ab3").contains(abcode) && level.equals("0")
+        }.map{
+          case (apptype, pagesource, level, label, abcode, featuresStr) =>
+            label + "\t" + featuresStr.mkString("\t")
+        }
+
+      // 4 保存数据到hdfs
+      val savePartition = dt + hh
+      val hdfsPath = savePath + "/" + savePartition
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")){
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      }else{
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+    // 合并一天数据
+
+
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
+    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+    val tagsList = tags.split(",")
+    var d1 = 0.0
+    val d2 = new ArrayBuffer[String]()
+    var d3 = 0.0
+    var d4 = 0.0
+    for (tag <- tagsList){
+      if (title.contains(tag)){
+        d1 = d1 + 1.0
+        d2.add(tag)
+      }
+      val score = Similarity.conceptSimilarity(tag, title)
+      d3 = if (score > d3) score else d3
+      d4 = d4 + score
+    }
+    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4
+    (d1, d2.mkString(","), d3, d4)
+  }
+}

+ 13 - 8
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -82,8 +82,8 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --class com.aliyun.odps.spark.examples.makedata.makedata_14_valueData_20240608 \
 --master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:/dw/recommend/model/13_sample_data_check/ \
-savePath:/dw/recommend/model/14_feature_data_check/ \
+readPath:/dw/recommend/model/13_sample_data_check_print/ \
+savePath:/dw/recommend/model/14_feature_data_check_print/ \
 beginStr:20240615 endStr:20240615 repartition:1000 \
 > p14_data_check.log 2>&1 &
 
@@ -110,8 +110,8 @@ beginStr:20240614 endStr:20240614 repartition:1000 \
 /dw/recommend/model/14_feature_data/
 /dw/recommend/model/16_train_data/
 
-
-
+-----
+一个执行:只有用线上打印特征的才执行
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_13_originData_20240529_check \
 --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
@@ -122,14 +122,19 @@ savePath:/dw/recommend/model/13_sample_data_check_print/ \
 table:alg_recsys_sample_all_new \
 > p13_2024061500_check.log 2>&1 &
 
+两个都要执行:过滤不需要的样本
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata.makedata_16_bucketData_20240609 \
+--class com.aliyun.odps.spark.examples.makedata.makedata_16_bucketData_20240609_check \
 --master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-readPath:/dw/recommend/model/14_feature_data_check/ \
-savePath:/dw/recommend/model/16_train_data_check/ \
+readPath:/dw/recommend/model/14_feature_data_check_print/ \
+savePath:/dw/recommend/model/16_train_data_check_print/ \
 beginStr:20240615 endStr:20240615 repartition:1000 \
 > p16_data_check.log 2>&1 &
 
 /dw/recommend/model/13_sample_data_check/
-/dw/recommend/model/13_sample_data_check_print/
+/dw/recommend/model/13_sample_data_check_print/
+/dw/recommend/model/14_feature_data_check/
+/dw/recommend/model/14_feature_data_check_print/
+/dw/recommend/model/16_train_data_check/
+/dw/recommend/model/16_train_data_check_print/

+ 3 - 0
zhangbo/03_predict.sh

@@ -17,6 +17,9 @@ cat predict/${output_file}_$day.txt | /root/sunmingze/AUC/AUC
 # nohup sh 03_predict.sh 20240613 /dw/recommend/model/16_train_data/ model_aka8_20240612.txt model_aka8_20240612 8 >p3_model_aka8_12.log 2>&1 &
 
 
+# nohup sh 03_predict.sh 20240615 /dw/recommend/model/16_train_data_check_print/ model_aka8_20240608.txt model_aka8_20240608 8 >p3_model_aka8_on.log 2>&1 &
+
+
 
 
 # cat tmpfile | /root/sunmingze/alphaFM/bin/fm_predict -m model/model_aka8_20240608.txt -dim 8 -core 1 -out tmpfile_out.txt