|
@@ -11,6 +11,7 @@ import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
import scala.collection.JavaConversions._
|
|
import scala.collection.mutable.ArrayBuffer
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
|
+import scala.util.Random
|
|
|
|
|
|
/*
|
|
/*
|
|
20241211 提取特征
|
|
20241211 提取特征
|
|
@@ -34,6 +35,8 @@ object makedata_recsys_61_str2ros_originData_20241209 {
|
|
val tablePart = param.getOrElse("tablePart", "64").toInt
|
|
val tablePart = param.getOrElse("tablePart", "64").toInt
|
|
val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_origin_data/")
|
|
val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_origin_data/")
|
|
val repartition = param.getOrElse("repartition", "32").toInt
|
|
val repartition = param.getOrElse("repartition", "32").toInt
|
|
|
|
+ val whatLabel = param.getOrElse("whatLabel", "is_share")
|
|
|
|
+ val fuSampleRate = param.getOrElse("fuSampleRate", "0.1").toDouble
|
|
|
|
|
|
// 2 odps
|
|
// 2 odps
|
|
val odpsOps = env.getODPS(sc)
|
|
val odpsOps = env.getODPS(sc)
|
|
@@ -50,6 +53,10 @@ object makedata_recsys_61_str2ros_originData_20241209 {
|
|
partition = partition,
|
|
partition = partition,
|
|
transfer = func,
|
|
transfer = func,
|
|
numPartition = tablePart)
|
|
numPartition = tablePart)
|
|
|
|
+ .filter(record => {
|
|
|
|
+ val label = if (record.isNull(whatLabel)) "0" else record.getString(whatLabel)
|
|
|
|
+ "1".equals(label) || new Random().nextDouble() <= fuSampleRate
|
|
|
|
+ })
|
|
.map(record => {
|
|
.map(record => {
|
|
val featureMap = new JSONObject()
|
|
val featureMap = new JSONObject()
|
|
|
|
|