|
@@ -44,9 +44,8 @@ object makedata_02_writeredis {
|
|
userDataTake.foreach(r=>{
|
|
userDataTake.foreach(r=>{
|
|
println(r.get(0) + "\t" + r.get(1))
|
|
println(r.get(0) + "\t" + r.get(1))
|
|
})
|
|
})
|
|
- val userDataTakeRdd = sc.parallelize(userDataTake, 50)
|
|
|
|
|
|
|
|
- val userDataTakeRddRun = userDataTakeRdd.mapPartitions(row=>{
|
|
|
|
|
|
+ val userDataTakeRddRun = userData.sample(false, 0.1).mapPartitions(row=>{
|
|
val redisTemplate = this.getRedisTemplate()
|
|
val redisTemplate = this.getRedisTemplate()
|
|
val redisFormat = new util.HashMap[String, String]
|
|
val redisFormat = new util.HashMap[String, String]
|
|
row.foreach(r =>{
|
|
row.foreach(r =>{
|
|
@@ -61,27 +60,27 @@ object makedata_02_writeredis {
|
|
println("user.action.count="+userDataTakeRddRun.count())
|
|
println("user.action.count="+userDataTakeRddRun.count())
|
|
|
|
|
|
//video测特征处理
|
|
//video测特征处理
|
|
- println("video测特征处理")
|
|
|
|
- val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
|
|
|
|
- val itemDataTake = itemData.take(10)
|
|
|
|
- itemDataTake.foreach(r => {
|
|
|
|
- println(r.get(0) + "\t" + r.get(1))
|
|
|
|
- })
|
|
|
|
- val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
|
|
|
|
-
|
|
|
|
- val itemDataTakeRddRun = itemDataTakeRdd.mapPartitions(row => {
|
|
|
|
- val redisTemplate = this.getRedisTemplate()
|
|
|
|
- val redisFormat = new util.HashMap[String, String]
|
|
|
|
- for (r <- row) {
|
|
|
|
- val key = r.get(0)
|
|
|
|
- val value = r.get(1)
|
|
|
|
- redisFormat.put(key, value)
|
|
|
|
- }
|
|
|
|
- redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
|
- redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
|
|
|
|
- redisFormat.iterator
|
|
|
|
- })
|
|
|
|
- println("item.action.count="+itemDataTakeRddRun.count())
|
|
|
|
|
|
+// println("video测特征处理")
|
|
|
|
+// val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
|
|
|
|
+// val itemDataTake = itemData.take(10)
|
|
|
|
+// itemDataTake.foreach(r => {
|
|
|
|
+// println(r.get(0) + "\t" + r.get(1))
|
|
|
|
+// })
|
|
|
|
+// val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
|
|
|
|
+//
|
|
|
|
+// val itemDataTakeRddRun = itemDataTakeRdd.mapPartitions(row => {
|
|
|
|
+// val redisTemplate = this.getRedisTemplate()
|
|
|
|
+// val redisFormat = new util.HashMap[String, String]
|
|
|
|
+// for (r <- row) {
|
|
|
|
+// val key = r.get(0)
|
|
|
|
+// val value = r.get(1)
|
|
|
|
+// redisFormat.put(key, value)
|
|
|
|
+// }
|
|
|
|
+// redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
|
+// redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
|
|
|
|
+// redisFormat.iterator
|
|
|
|
+// })
|
|
|
|
+// println("item.action.count="+itemDataTakeRddRun.count())
|
|
}
|
|
}
|
|
|
|
|
|
def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
|
|
def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
|