|
@@ -2,12 +2,13 @@ package com.aliyun.odps.spark.examples.makedata
|
|
|
|
|
|
import com.aliyun.odps.TableSchema
|
|
|
import com.aliyun.odps.data.Record
|
|
|
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
|
import com.google.gson.GsonBuilder
|
|
|
import examples.dataloader.RequestContextOffline
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
+import java.text.SimpleDateFormat
|
|
|
import java.util.concurrent.TimeUnit
|
|
|
import java.util
|
|
|
import scala.collection.JavaConversions._
|
|
@@ -82,7 +83,8 @@ object makedata_02_writeredis {
|
|
|
// 4 video测特征处理
|
|
|
if (ifVideo){
|
|
|
println("video特征处理")
|
|
|
- var itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = tablePart)
|
|
|
+ val handleItemFunction: (Record, TableSchema) => Tuple3[String, String, Int] = handleItem(_, _, date)
|
|
|
+ var itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItemFunction, numPartition = tablePart)
|
|
|
if (ifDebug) {
|
|
|
println("video特征处理-debug开启-只保留5条数据-特征数量大于1")
|
|
|
val itemDataTake = itemData.filter(_._3 > 1).take(5)
|
|
@@ -126,12 +128,21 @@ object makedata_02_writeredis {
|
|
|
(mid, value, reqContext.featureMap.size())
|
|
|
}
|
|
|
|
|
|
- def handleItem(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
|
|
|
+ def handleItem(record: Record, schema: TableSchema, date:String): Tuple3[String, String, Int] = {
|
|
|
val videoKey = "videoid"
|
|
|
val videoid = record.getBigint(videoKey).toString
|
|
|
val reqContext: RequestContextOffline = new RequestContextOffline()
|
|
|
+
|
|
|
+ //todo 有特征不在表里 临时修复
|
|
|
+ record.set("i_title_len", record.getString("title").length.toString)
|
|
|
+ val format = new SimpleDateFormat("yyyyMMdd")
|
|
|
+ val dateOld = format.format(record.getDatetime("gmt_create"))
|
|
|
+ val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
|
|
|
+ record.set("i_days_since_upload", dayDiff.toString)
|
|
|
+
|
|
|
reqContext.putItemFeature(record)
|
|
|
reqContext.featureMap.put("videoid", videoid)
|
|
|
+
|
|
|
val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
|
|
|
val value = gson.toJson(reqContext.featureMap)
|
|
|
(videoid, value, reqContext.featureMap.size())
|