|
@@ -3,16 +3,15 @@ package com.aliyun.odps.spark.examples.makedata
|
|
|
import com.alibaba.fastjson.JSONObject
|
|
|
import com.aliyun.odps.TableSchema
|
|
|
import com.aliyun.odps.data.Record
|
|
|
-import com.aliyun.odps.spark.examples.makedata.makedata_06_originData.getFeatureFromSet
|
|
|
import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
|
|
|
-import com.google.gson.GsonBuilder
|
|
|
-import examples.dataloader.RequestContextOffline
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import java.util
|
|
|
+import java.util.Date
|
|
|
import java.util.concurrent.TimeUnit
|
|
|
import scala.collection.JavaConversions._
|
|
|
+import scala.collection.mutable
|
|
|
|
|
|
|
|
|
object makedata_08_item2redis {
|
|
@@ -55,13 +54,15 @@ object makedata_08_item2redis {
|
|
|
|
|
|
val itemDataTakeRddRun = itemData.map(record =>{
|
|
|
val originFeatureName = Set(
|
|
|
+ "gmt_create", "existence_days",
|
|
|
"title", "tags", "total_time", "play_count_total",
|
|
|
"i_1day_exp_cnt", "i_1day_click_cnt", "i_1day_share_cnt", "i_1day_return_cnt",
|
|
|
"i_3day_exp_cnt", "i_3day_click_cnt", "i_3day_share_cnt", "i_3day_return_cnt",
|
|
|
"i_7day_exp_cnt", "i_7day_click_cnt", "i_7day_share_cnt", "i_7day_return_cnt",
|
|
|
"i_3month_exp_cnt", "i_3month_click_cnt", "i_3month_share_cnt", "i_3month_return_cnt"
|
|
|
)
|
|
|
- val originFeatureMap = getFeatureFromSet(originFeatureName, record)
|
|
|
+// val myList: List[(String, String)] = List(("value1", "value2"), ("value3", "value4"))
|
|
|
+ val originFeatureMap = getFeatureFromRecord(originFeatureName, record)
|
|
|
val videoid = record.getBigint("videoid").toString
|
|
|
val resultNew = new JSONObject
|
|
|
originFeatureName.foreach(r => {
|
|
@@ -111,4 +112,25 @@ object makedata_08_item2redis {
|
|
|
record
|
|
|
}
|
|
|
|
|
|
+ def getFeatureFromRecord(set: Set[String], record: Record): mutable.HashMap[String, String] = {
|
|
|
+ val result = mutable.HashMap[String, String]()
|
|
|
+ set.foreach(r => {
|
|
|
+ if (!record.isNull(r)) {
|
|
|
+ val obj = record.get(r)
|
|
|
+ if (obj.isInstanceOf[String]){
|
|
|
+ result.put(r, record.getString(r))
|
|
|
+ } else if (obj.isInstanceOf[BigInt]){
|
|
|
+ result.put(r, String.valueOf(record.getBigint(r)))
|
|
|
+ } else if (obj.isInstanceOf[Double]) {
|
|
|
+ result.put(r, String.valueOf(record.getDouble(r)))
|
|
|
+ } else if (obj.isInstanceOf[Date]) {
|
|
|
+ result.put(r, String.valueOf(record.getDatetime(r)))
|
|
|
+ } else {
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ result
|
|
|
+ }
|
|
|
+
|
|
|
}
|