zhangbo 1 jaar geleden
bovenliggende
commit
8aa10fb8e8
2 gewijzigde bestanden met toevoegingen van 10 en 38 verwijderingen
  1. 1 1
      pom.xml
  2. 9 37
      src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readhdfs.scala

+ 1 - 1
pom.xml

@@ -46,7 +46,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>recommend-feature-client</artifactId>
-            <version>1.0.0</version>
+            <version>1.0.1</version>
         </dependency>
 
         <dependency>

+ 9 - 37
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readhdfs.scala

@@ -3,9 +3,8 @@ package com.aliyun.odps.spark.examples
 import org.apache.spark.sql.SparkSession
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.google.common.collect.ListMultimap
-import examples.dataloader.RecommendSampleConstructor
 import org.apache.spark.aliyun.odps.OdpsOps
+import examples.sparksql.SparkShareRatioSampleLoader.singleParse
 
 import java.util
 import java.util.ArrayList
@@ -33,47 +32,20 @@ object makedata_01_readhdfs {
     val odpsData = odpsOps.readTable(project = project, table = table, partition = partition, transfer = read, numPartition = 5)
     println(s"Count (odpsData): ${odpsData.count()}")
 
+    val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition;
+    odpsData.saveAsTextFile(hdfsPath);
+
   }
 
-  def read(record: Record, schema: TableSchema): Long = {
+  def read(record: Record, schema: TableSchema): String = {
     val labelName = "share_ornot"
     val label = record.getString(labelName)
     val label_new = if (label == null || label == "1")
-      0L
+      "0"
     else
-      1L
-
-    // 从sql的 record中 初始化对象内容
-    val requestContext = RecommendSampleConstructor.constructRequestContext(record)
-    val userFeature = RecommendSampleConstructor.constructUserFeature(record)
-    val itemFeature = RecommendSampleConstructor.constructItemFeature(record)
-
-    // 转化成bytes// 转化成bytes
-    val requestContextBytesFeature = new Nothing(requestContext)
-    val userBytesFeature = new Nothing(userFeature)
-    val videoBytesFeature = new Nothing(itemFeature)
-
-    // 特征抽取// 特征抽取
-    var bytesFeatureExtractor: Nothing = null
-    bytesFeatureExtractor = new Nothing
-
-    bytesFeatureExtractor.getUserFeatures(userBytesFeature)
-    bytesFeatureExtractor.getItemFeature(videoBytesFeature)
-    bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature)
-
-    val featureMap = bytesFeatureExtractor.getFeatures
-    return parseSamplesToString(label, featureMap)
+      "1"
+    val result = singleParse(Record, label_new)
+    result
   }
 
-  def parseSamplesToString(label: String, featureMap: ListMultimap[FeatureGroup, BaseFeature]): String = {
-    val featureList: util.ArrayList[String] = new util.ArrayList[String]
-    import scala.collection.JavaConversions._
-    for (entry <- featureMap.entries) {
-      val groupedFeature: Nothing = entry.getKey
-      val baseFeature: Nothing = entry.getValue
-      val featureIdentifier: Long = baseFeature.getIdentifier
-      featureList.add(String.valueOf(featureIdentifier) + ":1")
-    }
-    label + "\t" + String.join("\t", featureList)
-  }
 }