|
@@ -27,7 +27,10 @@ object diff_data_20240718 {
|
|
|
|
|
|
val readPath = param.getOrElse("readPath", "/test/33_ad_train_data/20250101")
|
|
|
val data = sc.textFile(readPath)
|
|
|
-
|
|
|
+ val hdfsData = data.map(r => {
|
|
|
+ val map = processString(r)
|
|
|
+ map
|
|
|
+ })
|
|
|
|
|
|
|
|
|
// 2 读取odps+表信息
|
|
@@ -37,6 +40,9 @@ object diff_data_20240718 {
|
|
|
partition = partition,
|
|
|
transfer = func,
|
|
|
numPartition = 64)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|
|
|
def func(record: Record, schema: TableSchema): Map[String, String] = {
|
|
@@ -51,5 +57,26 @@ object diff_data_20240718 {
|
|
|
map
|
|
|
}
|
|
|
|
|
|
+ private def processString(input: String): Map[String, String] = {
|
|
|
+ // 去除多余空格并按空格分割成键值对数组
|
|
|
+ val parts = input.trim.split("\t")
|
|
|
+ var resultMap = Map[String, String]()
|
|
|
+ // 处理第一个元素,假设为特殊标识
|
|
|
+ resultMap += ("has_conversion" -> parts(0).toLong)
|
|
|
+ // 处理后续的键值对
|
|
|
+ parts.drop(1).foreach { part =>
|
|
|
+ part.split(":", 2) match {
|
|
|
+ case Array(keyStr, value) =>
|
|
|
+ var key = keyStr.replace("*", "_x_").replace("(view)", "_view")
|
|
|
+ if (key == "ad_is_click") {
|
|
|
+ key = "has_click"
|
|
|
+ }
|
|
|
+ resultMap += (key -> value)
|
|
|
+ case _ => // 若无法解析成键值对则丢弃
|
|
|
+ }
|
|
|
+ }
|
|
|
+ resultMap
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
}
|