zhangbo 1 gadu atpakaļ
vecāks
revīzija
5d79b7384e

+ 43 - 68
pom.xml

@@ -17,27 +17,25 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
+
     <properties>
-        <spark.version>3.1.1</spark.version>
-        <oss.sdk.version>3.0.0</oss.sdk.version>
+        <spark.version>2.3.0</spark.version>
         <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
-        <scala.version>2.12.10</scala.version>
-        <scala.binary.version>2.12</scala.binary.version>
-        <odps.version>0.28.4-public</odps.version>
+        <scala.version>2.11.8</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <java.version>1.8</java.version>
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
         <emr.version>2.0.0</emr.version>
+        <odps.version>0.28.4-public</odps.version>
     </properties>
 
     <groupId>com.aliyun.odps</groupId>
-    <artifactId>spark-examples_${scala.binary.version}</artifactId>
+    <artifactId>spark-examples</artifactId>
     <version>1.0.0-SNAPSHOT</version>
     <packaging>jar</packaging>
 
     <dependencies>
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>3.12.0</version>
-        </dependency>
 
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
@@ -51,14 +49,12 @@
             <version>1.0.0</version>
         </dependency>
 
-
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>ad-engine-commons</artifactId>
             <version>1.0.0</version>
         </dependency>
 
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -66,36 +62,32 @@
             <scope>provided</scope>
             <exclusions>
                 <exclusion>
-                    <artifactId>protobuf-java</artifactId>
-                    <groupId>com.google.protobuf</groupId>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scalap</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>protobuf-java</artifactId>
-                    <groupId>com.google.protobuf</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-mllib_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>protobuf-java</artifactId>
-                    <groupId>com.google.protobuf</groupId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -103,65 +95,37 @@
             <artifactId>cupid-sdk</artifactId>
             <version>${cupid.sdk.version}</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>protobuf-java</artifactId>
-                    <groupId>com.google.protobuf</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>hadoop-fs-oss</artifactId>
             <version>${cupid.sdk.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>protobuf-java</artifactId>
-                    <groupId>com.google.protobuf</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
-
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-data-redis</artifactId>
-            <version>2.4.2</version>
-        </dependency>
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-            <version>3.3.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-            <version>1.18.24</version>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
+            <version>${cupid.sdk.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>com.aliyun.odps</groupId>
-            <artifactId>odps-sdk-commons</artifactId>
-            <version>${odps.version}</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
         </dependency>
-
         <dependency>
-            <groupId>com.aliyun.emr</groupId>
-            <artifactId>emr-mns_2.11</artifactId>
-            <version>${emr.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.aliyun.mns</groupId>
-                    <artifactId>aliyun-sdk-mns</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-actors</artifactId>
+            <version>${scala.version}</version>
         </dependency>
+
         <dependency>
             <groupId>com.aliyun.emr</groupId>
             <artifactId>emr-maxcompute_2.11</artifactId>
             <version>${emr.version}</version>
         </dependency>
+
     </dependencies>
 
     <build>
@@ -240,6 +204,17 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <!--<compilerId>scala</compilerId>-->
+                    <!-- <compilerVersion>2.12.10</compilerVersion>-->
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

+ 79 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readhdfs.scala

@@ -0,0 +1,79 @@
+package com.aliyun.odps.spark.examples
+
+import org.apache.spark.sql.SparkSession
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.google.common.collect.ListMultimap
+import examples.dataloader.RecommendSampleConstructor
+import org.apache.spark.aliyun.odps.OdpsOps
+
+import java.util
+import java.util.ArrayList
+
+object makedata_01_readhdfs {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("WordCount")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val accessKeyId = "LTAIWYUujJAm7CbH"
+    val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    val odpsUrl = "http://service.odps.aliyun.com/api"
+    val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+    println("Read odps table...")
+
+    val project = "loghubods"
+    val table = "alg_recsys_view_sample"
+    val partition = "dt=20231218"
+
+
+    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
+    val odpsData = odpsOps.readTable(project = project, table = table, partition = partition, transfer = read, numPartition = 5)
+    println(s"Count (odpsData): ${odpsData.count()}")
+
+  }
+
+  def read(record: Record, schema: TableSchema): Long = {
+    val labelName = "share_ornot"
+    val label = record.getString(labelName)
+    val label_new = if (label == null || label == "1")
+      0L
+    else
+      1L
+
+    // 从sql的 record中 初始化对象内容
+    val requestContext = RecommendSampleConstructor.constructRequestContext(record)
+    val userFeature = RecommendSampleConstructor.constructUserFeature(record)
+    val itemFeature = RecommendSampleConstructor.constructItemFeature(record)
+
+    // 转化成bytes// 转化成bytes
+    val requestContextBytesFeature = new Nothing(requestContext)
+    val userBytesFeature = new Nothing(userFeature)
+    val videoBytesFeature = new Nothing(itemFeature)
+
+    // 特征抽取// 特征抽取
+    var bytesFeatureExtractor: Nothing = null
+    bytesFeatureExtractor = new Nothing
+
+    bytesFeatureExtractor.getUserFeatures(userBytesFeature)
+    bytesFeatureExtractor.getItemFeature(videoBytesFeature)
+    bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature)
+
+    val featureMap = bytesFeatureExtractor.getFeatures
+    return parseSamplesToString(label, featureMap)
+  }
+
+  def parseSamplesToString(label: String, featureMap: ListMultimap[FeatureGroup, BaseFeature]): String = {
+    val featureList: util.ArrayList[String] = new util.ArrayList[String]
+    import scala.collection.JavaConversions._
+    for (entry <- featureMap.entries) {
+      val groupedFeature: Nothing = entry.getKey
+      val baseFeature: Nothing = entry.getValue
+      val featureIdentifier: Long = baseFeature.getIdentifier
+      featureList.add(String.valueOf(featureIdentifier) + ":1")
+    }
+    label + "\t" + String.join("\t", featureList)
+  }
+}

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/sparksql/SparkSQL.scala

@@ -55,7 +55,7 @@ object SparkSQL {
 
     // 写 普通表
     df.write.insertInto(tableName) // insertInto语义
-    df.writeTo(tableName).overwritePartitions() // insertOverwrite use datasourceV2
+//    df.writeTo(tableName).overwritePartitions() // insertOverwrite use datasourceV2
 
     // 写 分区表
     // DataFrameWriter 无法指定分区写入 需要通过临时表再用SQL写入特定分区