zhangbo 1 рік тому
батько
коміт
e10562c553

+ 45 - 8
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_09_user2redis.scala

@@ -2,11 +2,13 @@ package com.aliyun.odps.spark.examples.makedata
 
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
 import com.google.gson.GsonBuilder
 import examples.dataloader.RequestContextOffline
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.SparkSession
+import com.aliyun.odps.spark.examples.makedata.makedata_06_originData.getFeatureFromSet
+import com.alibaba.fastjson.JSONObject
 
 import java.util
 import java.util.concurrent.TimeUnit
@@ -48,14 +50,49 @@ object makedata_09_user2redis {
 
 
 
-    // 3 用户测特征处理
+    // 3-1 用户测特征处理
     if (ifUser){
       println("user特征处理")
-      var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
-        .filter {
-          case (mid, fea, feaSize) =>
-            mid.nonEmpty && fea.nonEmpty && feaSize > 0
-        }
+
+      // 3-2 读取最近2个月有播放行为的mid集合
+      var midRdd = sc.emptyRDD[String]
+      MyDateUtils.getDateRange(MyDateUtils.getNumDaysBefore(date, 30), date).foreach(d => {
+        println("-----------读取播放信息={}------------".format(d))
+        var partitionMid = "dt=" + d
+        val data = odpsOps.readTable(project = "loghubods", table = "play_action_log",
+            partition = partitionMid, transfer = func, numPartition = tablePart)
+          .map(r => {
+            if (r.isNull("machinecode")) "" else r.getString("machinecode")
+          }).filter(_.nonEmpty)
+        midRdd = midRdd.union(data).distinct()
+      })
+      println("------------mid处理完毕:" + midRdd.count() + "------------------")
+
+      var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition,
+        transfer = func, numPartition = tablePart)
+        .map(record =>{
+          val userKey = "mids"
+          val mid = record.getString(userKey)
+          val originFeatureName = Set(
+            "gender", "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_sdkversion",
+            "machineinfo_system", "machineinfo_wechatversion", "gmt_create_user",
+            "u_1day_exp_cnt", "u_1day_click_cnt", "u_1day_share_cnt", "u_1day_return_cnt",
+            "u_3day_exp_cnt", "u_3day_click_cnt", "u_3day_share_cnt", "u_3day_return_cnt",
+            "u_7day_exp_cnt", "u_7day_click_cnt", "u_7day_share_cnt", "u_7day_return_cnt",
+            "u_3month_exp_cnt", "u_3month_click_cnt", "u_3month_share_cnt", "u_3month_return_cnt"
+          )
+          val originFeatureMap = getFeatureFromSet(originFeatureName, record)
+          val resultNew = new JSONObject
+          originFeatureName.foreach(r => {
+            if (originFeatureMap.containsKey(r)) {
+              val v = originFeatureMap.get(r).get
+              resultNew.put(r, v)
+            }
+          })
+          (mid, resultNew.toString())
+        })
+      userData = userData.leftOuterJoin(midRdd.map(r=> (r, 1))).map(r=> (r._1, r._2._1))
+
       if (userSampleIDs.nonEmpty){
         val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
         userData = userData.filter(r => IDs.contains(r._1.hashCode % 10))
@@ -63,7 +100,7 @@ object makedata_09_user2redis {
       if (ifDebug){
         println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
         val userDataTake = userData.take(5)
-        userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
+        userDataTake.foreach(r=> println(r._1 + "\t" + r._2))
         userData = sc.parallelize(userDataTake)
       }
       if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {