zhangbo 1 год назад
Родитель
Сommit
33bae8e6b8

+ 1 - 1
pom.xml

@@ -46,7 +46,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>recommend-feature-client</artifactId>
-            <version>1.0.1</version>
+            <version>1.0.2</version>
         </dependency>
 
         <dependency>

+ 89 - 0
src/main/java/examples/dataloader/RequestContextOffline.java

@@ -0,0 +1,89 @@
+package examples.dataloader;
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext;
+import com.aliyun.odps.data.Record;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RequestContextOffline extends RequestContext {
+    public Map<String, Object> featureMap = new HashMap<>();
+
+    public void putUserFeature(Record record){
+        setKVinMap(record, "machineinfo_brand", "string", "");
+        setKVinMap(record, "machineinfo_model", "string", "");
+        setKVinMap(record, "machineinfo_platform", "string","");
+        setKVinMap(record, "machineinfo_system", "string","");
+
+        setKVinMap(record, "u_1day_exp_cnt", "double", "cnt");
+        setKVinMap(record, "u_1day_click_cnt", "double", "cnt");
+        setKVinMap(record, "u_1day_share_cnt", "double", "cnt");
+        setKVinMap(record, "u_1day_return_cnt", "double", "cnt");
+
+        setKVinMap(record, "u_ctr_1day", "double", "rate");
+        setKVinMap(record, "u_str_1day", "double", "rate");
+        setKVinMap(record, "u_rov_1day", "double", "rate");
+        setKVinMap(record, "u_ros_1day", "double", "rate");
+    }
+    public void putItemFeature(Record record){
+        setKVinMap(record, "i_title_len", "double", "cnt");
+        setKVinMap(record, "total_time", "double", "cnt");
+        setKVinMap(record, "i_days_since_upload", "double", "cnt");
+        setKVinMap(record, "play_count_total", "double", "cnt");
+
+        setKVinMap(record, "i_1day_exp_cnt", "double", "cnt");
+        setKVinMap(record, "i_1day_click_cnt", "double", "cnt");
+        setKVinMap(record, "i_1day_share_cnt", "double", "cnt");
+        setKVinMap(record, "i_1day_return_cnt", "double", "cnt");
+
+        setKVinMap(record, "i_ctr_1day", "double", "rate");
+        setKVinMap(record, "i_str_1day", "double", "rate");
+        setKVinMap(record, "i_rov_1day", "double", "rate");
+        setKVinMap(record, "i_ros_1day", "double", "rate");
+    }
+
+    public void putSceneFeature(Record record){
+        setKVinMap(record, "ctx_week", "string", "");
+        setKVinMap(record, "ctx_hour", "string", "");
+        setKVinMap(record, "ctx_region", "string","");
+        setKVinMap(record, "ctx_city", "string","");
+    }
+    private void setKVinMap(Record record, String key, String instance, String cntOrRate){
+        if (record.getString(key) == null){
+            return;
+        }
+        String ins = instance.toLowerCase();
+        switch (ins){
+            case "string":
+                featureMap.put(key, record.getString(key));
+                return;
+            case "double":
+                if ("cnt".equals(cntOrRate)){
+                    featureMap.put(key, this.bucketRatioFeature(Double.valueOf(record.getString(key))));
+                }else if ("rate".equals(cntOrRate)){
+                    featureMap.put(key, this.ceilLog(Double.valueOf(record.getString(key))));
+                }
+                return;
+            case "int":
+                return;
+            case "long":
+                return;
+            default:
+                return;
+        }
+    }
+
+
+    private double ceilLog(Double key) {
+        return Math.ceil(Math.log(key + 1.0));
+    }
+
+    private double bucketRatioFeature(Double key) {
+        long bucket = Math.round(Math.log((key + 1.0) * 50.0));
+        if (bucket > 50L) {
+            bucket = 50L;
+        }
+
+        return (double)bucket;
+    }
+}

+ 45 - 22
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readhdfs.scala

@@ -3,49 +3,72 @@ package com.aliyun.odps.spark.examples
 import org.apache.spark.sql.SparkSession
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import org.apache.spark.aliyun.odps.OdpsOps
-import examples.sparksql.SparkShareRatioSampleLoader.singleParse
+import com.aliyun.odps.spark.examples.myUtils.ParamUtils
+import com.aliyun.odps.spark.examples.myUtils.env
+import com.google.common.collect.ListMultimap
+import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor
+import examples.dataloader.RequestContextOffline
 
-import java.util
-import java.util.ArrayList
+import scala.collection.JavaConversions._
 
 object makedata_01_readhdfs {
   def main(args: Array[String]) {
     val spark = SparkSession
       .builder()
-      .appName("WordCount")
+      .appName(this.getClass.getName)
       .getOrCreate()
     val sc = spark.sparkContext
 
-    val accessKeyId = "LTAIWYUujJAm7CbH"
-    val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
-    val odpsUrl = "http://service.odps.aliyun.com/api"
-    val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
-    println("Read odps table...")
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partition = param.getOrElse("partition", "dt=20231220")
+    val tablePart = param.getOrElse("tablePart", "16").toInt
 
+
+    // 2 读取训练数据
+    val odpsOps = env.getODPS(sc)
     val project = "loghubods"
     val table = "alg_recsys_view_sample"
-    val partition = "dt=20231218"
-
-
-    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
-    val odpsData = odpsOps.readTable(project = project, table = table, partition = partition, transfer = read, numPartition = 5)
-    println(s"Count (odpsData): ${odpsData.count()}")
-
-    val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition;
-    odpsData.saveAsTextFile(hdfsPath);
-
+    val odpsData = odpsOps.readTable(project = project,
+      table = table,
+      partition = partition,
+      transfer = read,
+      numPartition = tablePart)
+    val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition
+    odpsData.saveAsTextFile(hdfsPath)
   }
 
   def read(record: Record, schema: TableSchema): String = {
     val labelName = "share_ornot"
     val label = record.getString(labelName)
-    val label_new = if (label == null || label == "1")
+    val newLabel = if (label == null || label == "1")
       "0"
     else
       "1"
-    val result = singleParse(record, labelName)
+    val result = singleParse(record)
     result
   }
 
+  def singleParse(record: Record): String = {
+    //1 处理标签
+    val label: String = record.getString("share_ornot")
+    val newLabel = if ("1".equals(label)) "0" else "1"
+    //2 处理特征
+    val reqContext: RequestContextOffline = new RequestContextOffline()
+    reqContext.putUserFeature(record)
+    reqContext.putItemFeature(record)
+    reqContext.putSceneFeature(record)
+
+
+
+    val bytesFeatureExtractor: VlogShareLRFeatureExtractor = new VlogShareLRFeatureExtractor()
+    bytesFeatureExtractor.getUserFeatures(userBytesFeature)
+    bytesFeatureExtractor.getItemFeature(videoBytesFeature)
+    bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature)
+    val featureMap = bytesFeatureExtractor.getFeatures
+    newLabel + "\t" + featureMap.entries().map(r => r.getValue.getIdentifier + ":1").mkString("\t")
+  }
+
+
 }

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -20,7 +20,7 @@ object makedata_02_writeredis {
   def main(args: Array[String]) {
     val spark = SparkSession
       .builder()
-      .appName("WordCount")
+      .appName(this.getClass.getName)
       .getOrCreate()
     val sc = spark.sparkContext
 

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_03_deleteredis.scala

@@ -19,7 +19,7 @@ object makedata_03_deleteredis {
   def main(args: Array[String]) {
     val spark = SparkSession
       .builder()
-      .appName("WordCount")
+      .appName(this.getClass.getName)
       .getOrCreate()
     val sc = spark.sparkContext
 

+ 15 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/env.scala

@@ -0,0 +1,15 @@
+package com.aliyun.odps.spark.examples.myUtils
+
+import org.apache.spark.SparkContext
+import org.apache.spark.aliyun.odps.OdpsOps
+
+object env {
+  def getODPS(sparkContext: SparkContext): OdpsOps = {
+    val accessKeyId = "LTAIWYUujJAm7CbH"
+    val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    val odpsUrl = "http://service.odps.aliyun.com/api"
+    val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+
+    OdpsOps(sparkContext, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
+  }
+}