sunmingze 1 year ago
commit
22abbb11b8
28 changed files with 2576 additions and 0 deletions
  1. 203 0
      pom.xml
  2. 87 0
      src/main/java/com/aliyun/odps/spark/examples/sparksql/JavaSparkSQL.java
  3. 51 0
      src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRReadMaxCompute.java
  4. 14 0
      src/main/java/com/tzld/piaoquan/data/base/Constant.java
  5. 138 0
      src/main/java/com/tzld/piaoquan/data/base/ItemFeature.java
  6. 122 0
      src/main/java/com/tzld/piaoquan/data/base/RequestContext.java
  7. 44 0
      src/main/java/com/tzld/piaoquan/data/base/RequestContextBytesFeature.java
  8. 108 0
      src/main/java/com/tzld/piaoquan/data/base/UserActionFeature.java
  9. 88 0
      src/main/java/com/tzld/piaoquan/data/base/UserBytesFeature.java
  10. 85 0
      src/main/java/com/tzld/piaoquan/data/base/UserFeature.java
  11. 93 0
      src/main/java/com/tzld/piaoquan/data/base/VideoBytesFeature.java
  12. 131 0
      src/main/java/com/tzld/piaoquan/data/base/VlogFeatureGroup.java
  13. 165 0
      src/main/java/com/tzld/piaoquan/data/dataloader/FeatureConstructor.java
  14. 78 0
      src/main/java/com/tzld/piaoquan/data/dataloader/UserFeatureSparkLoaderMaxcompute.java
  15. 35 0
      src/main/java/com/tzld/piaoquan/data/score/feature/BytesGroup.java
  16. 192 0
      src/main/java/com/tzld/piaoquan/data/score/feature/BytesUtils.java
  17. 230 0
      src/main/java/com/tzld/piaoquan/data/score/feature/FeatureHash.java
  18. 67 0
      src/main/java/com/tzld/piaoquan/data/score/feature/LRBytesFeatureExtractorBase.java
  19. 152 0
      src/main/java/com/tzld/piaoquan/data/score/feature/VlogShareLRFeatureExtractor.java
  20. 26 0
      src/main/python/spark_oss.py
  21. 55 0
      src/main/python/spark_sql.py
  22. 46 0
      src/main/scala/com/aliyun/odps/spark/examples/SparkPi.scala
  23. 37 0
      src/main/scala/com/aliyun/odps/spark/examples/WordCount.scala
  24. 73 0
      src/main/scala/com/aliyun/odps/spark/examples/graphx/PageRank.scala
  25. 62 0
      src/main/scala/com/aliyun/odps/spark/examples/mllib/KmeansModelSaveToOss.scala
  26. 69 0
      src/main/scala/com/aliyun/odps/spark/examples/oss/JindoFsDemo.scala
  27. 43 0
      src/main/scala/com/aliyun/odps/spark/examples/oss/SparkUnstructuredDataCompute.scala
  28. 82 0
      src/main/scala/com/aliyun/odps/spark/examples/sparksql/SparkSQL.scala

+ 203 - 0
pom.xml

@@ -0,0 +1,203 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <properties>
+        <spark.version>3.1.1</spark.version>
+        <oss.sdk.version>3.0.0</oss.sdk.version>
+        <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
+        <scala.version>2.12.10</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <odps.version>0.28.4-public</odps.version>
+        <emr.version>2.0.0</emr.version>
+    </properties>
+
+    <groupId>com.aliyun.odps</groupId>
+    <artifactId>spark-examples_${scala.binary.version}</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>cupid-sdk</artifactId>
+            <version>${cupid.sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>hadoop-fs-oss</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.tzld.piaoquan</groupId>
+            <artifactId>recommend-server-client</artifactId>
+            <version>1.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.12.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+            <version>2.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>3.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-commons</artifactId>
+            <version>${odps.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.emr</groupId>
+            <artifactId>emr-mns_2.11</artifactId>
+            <version>${emr.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.aliyun.mns</groupId>
+                    <artifactId>aliyun-sdk-mns</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.emr</groupId>
+            <artifactId>emr-maxcompute_2.11</artifactId>
+            <version>${emr.version}</version>
+        </dependency>
+
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <minimizeJar>false</minimizeJar>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <artifactSet>
+                                <includes>
+                                    <!-- Include here the dependencies you
+                                        want to be packed in your fat jar -->
+                                    <include>*:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>**/log4j.properties</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>reference.conf</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>
+                                        META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+                                    </resource>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.3.2</version>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 87 - 0
src/main/java/com/aliyun/odps/spark/examples/sparksql/JavaSparkSQL.java

@@ -0,0 +1,87 @@
+
+
+package com.aliyun.odps.spark.examples.sparksql;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.cupid.CupidSession;
+import com.tzld.piaoquan.data.base.UserFeature;
+import com.tzld.piaoquan.data.dataloader.FeatureConstructor;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.sql.types.StructField;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+public class JavaSparkSQL {
+
+    private static final String userKeyFormat = "user:%s";
+
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Row line) {
+        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
+        UserFeature userFeature = FeatureConstructor.constructUserFeature(line);
+        String key = String.format(userKeyFormat, userFeature.getKey());
+        String value = userFeature.getValue();
+        userFeaRedisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+    }
+
+
+
+    public static void main(String[] args) throws Exception {
+        SparkSession spark = SparkSession
+                .builder()
+                .appName("SparkSQL-on-MaxCompute")
+                .config("spark.sql.defaultCatalog","odps")
+                .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
+                .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
+                .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
+                .config("spark.sql.catalogImplementation","hive")
+                .getOrCreate();
+        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
+
+
+
+        // 读 分区表
+        Dataset<Row> rptdf = spark.sql("select * from loghubods.alg_recsys_user_info where dt='20231210' limit 1000");
+        System.out.println("rptdf count: " + rptdf.count());
+        rptdf.printSchema();
+
+
+        rptdf.toJavaRDD().foreachPartition(
+                rowIterator -> {
+                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+                }
+        );
+
+
+    }
+}

+ 51 - 0
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRReadMaxCompute.java

@@ -0,0 +1,51 @@
+package com.aliyun.odps.spark.examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class SparkEMRReadMaxCompute {
+
+    public static void main(String[] args) {
+
+        String partition = "20231210";
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.cn.maxcompute.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_recsys_video_info";
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+
+        System.out.println("Read odps table...");
+        JavaRDD<List<Long>> readData = odpsOps.readTableWithJava(project, partition, table, new RecordToLongs(), Integer.valueOf(10));
+
+        System.out.println("counts: ");
+        System.out.println(readData.count());
+    }
+
+    static class RecordToLongs implements Function2<Record, TableSchema, List<Long>> {
+        @Override
+        public List<Long> call(Record record, TableSchema schema) throws Exception {
+            List<Long> ret = new ArrayList<Long>();
+            for (int i = 0; i < schema.getColumns().size(); i++) {
+                ret.add(record.getBigint(i));
+            }
+            return ret;
+        }
+    }
+
+
+}

+ 14 - 0
src/main/java/com/tzld/piaoquan/data/base/Constant.java

@@ -0,0 +1,14 @@
+package com.tzld.piaoquan.data.base;
+
+/**
+ * 常量
+ *
+ * @author supeng
+ * @date 2020/08/19
+ */
+public class Constant {
+    /**
+     * traceID
+     */
+    public static final String LOG_TRACE_ID = "logTraceId";
+}

+ 138 - 0
src/main/java/com/tzld/piaoquan/data/base/ItemFeature.java

@@ -0,0 +1,138 @@
+package com.tzld.piaoquan.data.base;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+public class ItemFeature {
+    private String videoId;
+
+    private String upId;
+
+    private String tags;
+
+    /**
+     * 有多个标题,暂时不会用到所以先不处理
+     * @since 2023-12-05
+     */
+    private String title;
+
+    private String titleLength;
+
+    private String playLength;
+
+    private String totalTime;
+
+    private String daysSinceUpload;
+
+
+    // 当天统计量信息
+    private UserActionFeature day1_cnt_features;
+    // 3天内统计量
+    private UserActionFeature day3_cnt_features;
+    // 7天内统计量
+    private UserActionFeature day7_cnt_features;
+    // 3个月统计量
+    private UserActionFeature month3_cnt_features;
+
+
+    public void setVideoId(String key){
+        if(key == null){
+            this.videoId = "0";
+        } else {
+            this.videoId = key;
+        }
+    }
+
+    public void setUpId(String key){
+        if(key == null){
+            this.upId = "0";
+        } else {
+            this.upId = key;
+        }
+    }
+
+    public void setTags(String key){
+        if(key == null){
+            this.tags = "0";
+        } else {
+            this.tags = key;
+        }
+    }
+
+    public void setTitle(String key){
+        if(key == null){
+            this.title = "0";
+        } else {
+            this.title = key;
+        }
+    }
+
+
+    public void setDay1_cnt_features(UserActionFeature feature){
+        this.day1_cnt_features = feature;
+    }
+
+
+    public void setDay3_cnt_features(UserActionFeature feature){
+        this.day3_cnt_features = feature;
+
+    }
+
+    public void setDay7_cnt_features(UserActionFeature feature){
+        this.day7_cnt_features = feature;
+
+    }
+
+    public void setMonth3_cnt_features(UserActionFeature feature){
+        this.month3_cnt_features= feature;
+
+    }
+
+    public void setTitleLength(String key) {
+        if(key == null){
+            this.titleLength = "0";
+        } else {
+            this.titleLength = key;
+        }
+    }
+
+
+    public void setDaysSinceUpload(String key) {
+        if(key == null){
+            this.daysSinceUpload = "0";
+        } else {
+            this.daysSinceUpload = key;
+        }
+    }
+
+    public void setPlayLength(String key) {
+        if(key == null){
+            this.playLength = "0";
+        } else {
+            this.playLength = key;
+        }
+    }
+
+    public void setTotalTime(String key) {
+        if(key == null){
+            this.totalTime = "0";
+        } else {
+            this.totalTime = key;
+        }
+    }
+
+    public String getKey() {
+        return this.videoId;
+    }
+
+    public String getValue(){
+        Gson gson = new GsonBuilder().serializeSpecialFloatingPointValues().create();
+        return gson.toJson(this);
+    }
+
+
+}

+ 122 - 0
src/main/java/com/tzld/piaoquan/data/base/RequestContext.java

@@ -0,0 +1,122 @@
+package com.tzld.piaoquan.data.base;
+
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+public class RequestContext {
+
+    private String request_id;
+    // 机型等信息
+    private String apptype;
+    private String machineinfo_brand;
+    private String machineinfo_model;
+    private String machineinfo_platform;
+    private String machineinfo_sdkversion;
+    private String machineinfo_system;
+    private String machineinfo_wechatversion;
+
+    // 时间等信息
+    private String day;
+    private String week;
+    private String hour;
+    private String region;
+    private String city;
+
+    public void setApptype(String apptype) {
+        this.apptype = apptype;
+        if(apptype == null)
+            this.apptype = "-1";
+    }
+
+    public void setMachineinfo_brand(String machineinfo_brand) {
+        this.machineinfo_brand = machineinfo_brand;
+        if(machineinfo_brand == null)
+            this.machineinfo_brand = "-1";
+    }
+
+    public void setMachineinfo_model(String machineinfo_model) {
+        this.machineinfo_model = machineinfo_model;
+        if(machineinfo_model == null)
+            this.machineinfo_model = "-1";
+    }
+
+
+    public void setMachineinfo_wechatversion(String machineinfo_wechatversion) {
+        this.machineinfo_wechatversion = machineinfo_wechatversion;
+        if(machineinfo_wechatversion == null)
+            this.machineinfo_wechatversion = "-1";
+    }
+
+
+    public void setMachineinfo_sdkversion(String machineinfo_sdkversion) {
+        this.machineinfo_sdkversion = machineinfo_sdkversion;
+        if(machineinfo_sdkversion == null)
+            this.machineinfo_sdkversion = "-1";
+    }
+
+    public void setMachineinfo_platform(String machineinfo_platform) {
+        this.machineinfo_platform = machineinfo_platform;
+        if(machineinfo_platform == null)
+            this.machineinfo_platform = "-1";
+    }
+
+    public void setMachineinfo_system(String machineinfo_system) {
+        this.machineinfo_system = machineinfo_system;
+        if(machineinfo_system == null)
+            this.machineinfo_system = "-1";
+    }
+
+
+
+    public void setHour(String hour) {
+        this.hour = hour;
+        if(hour == null)
+            this.hour = "-1";
+    }
+
+
+    public void setDay(String day) {
+        this.day = day;
+        if(day == null)
+            this.day = "-1";
+    }
+
+    public void setWeek(String week) {
+        this.week = week;
+        if(week == null)
+            this.week = "-1";
+    }
+
+
+    public void setRegion(String region) {
+        this.region = region;
+        if(region == null)
+            this.region = "-1";
+    }
+
+
+    public void setCity(String city) {
+        this.city = city;
+        if(city == null)
+            this.city = "-1";
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+}

+ 44 - 0
src/main/java/com/tzld/piaoquan/data/base/RequestContextBytesFeature.java

@@ -0,0 +1,44 @@
+package com.tzld.piaoquan.data.base;
+
+
+import lombok.Data;
+
+@Data
+public class RequestContextBytesFeature {
+    private final byte[] apptype;
+    private final byte[]  machineinfo_brand;
+    private final byte[]  machineinfo_model;
+    private final byte[]  machineinfo_platform;
+    private final byte[]  machineinfo_sdkversion;
+    private final byte[]  machineinfo_system;
+    private final byte[]  machineinfo_wechatversion;
+
+    // 时间等信息
+    private final byte[]  day;
+    private final byte[]  week;
+    private final byte[]  hour;
+    private final byte[]  region;
+    private final byte[]  city;
+
+
+    public RequestContextBytesFeature(RequestContext requestContext){
+        apptype = requestContext.getApptype().getBytes();
+        machineinfo_brand = requestContext.getMachineinfo_brand().getBytes();
+        machineinfo_model = requestContext.getMachineinfo_model().getBytes();
+        machineinfo_platform = requestContext.getMachineinfo_platform().getBytes();
+        machineinfo_sdkversion = requestContext.getMachineinfo_sdkversion().getBytes();
+        machineinfo_system = requestContext.getMachineinfo_system().getBytes();
+        machineinfo_wechatversion = requestContext.getMachineinfo_wechatversion().getBytes();
+
+        day = requestContext.getDay().getBytes();
+        week = requestContext.getWeek().getBytes();
+        hour = requestContext.getHour().getBytes();
+        region = requestContext.getRegion().getBytes();
+        city = requestContext.getCity().getBytes();
+    }
+
+
+
+
+
+}

+ 108 - 0
src/main/java/com/tzld/piaoquan/data/base/UserActionFeature.java

@@ -0,0 +1,108 @@
+package com.tzld.piaoquan.data.base;
+
+import lombok.Data;
+
+@Data
+public class UserActionFeature {
+    private double exp_cnt;
+    private double click_cnt;
+    private double share_cnt;
+    private double return_cnt;
+
+    private double ctr;
+    private double str;
+    private double rov;
+    private double ros;
+
+    private double ceilLog(Double key) {
+        return Math.ceil(Math.log(key));
+    }
+
+    private double bucketRatioFeature(Double key) {
+        long bucket = Math.round(Math.log(key * 100));
+        if( bucket > 100)
+            bucket = 100;
+        return (double) bucket;
+    }
+
+
+    public void setExp_cnt(Object key){
+        if(key == null ) {
+            this.exp_cnt = 0.0;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.exp_cnt = ceilLog(Double.valueOf(formateKey));
+        }
+    }
+
+    public void setClick_cnt(Object key){
+        if(key == null ){
+            this.click_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.click_cnt = ceilLog(Double.valueOf(formateKey));
+        }
+    }
+    public void setShare_cnt(Object key){
+        if(key == null ){
+            this.share_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.share_cnt = ceilLog(Double.valueOf(formateKey));
+        }
+    }
+    public void setReturn_cnt(Object key){
+        if(key == null ){
+            this.return_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.return_cnt = ceilLog(Double.valueOf(formateKey));
+        }
+    }
+
+    public void setCtr(Object key){
+        if(key == null ){
+            this.ctr = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.ctr = bucketRatioFeature(Double.valueOf(formateKey));
+        }
+    }
+
+    public void setStr(Object key){
+        if(key == null ){
+            this.str = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.str = bucketRatioFeature(Double.valueOf(formateKey));
+        }
+    }
+
+    public void setRov(Object key){
+        if(key == null ){
+            this.rov = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.rov = bucketRatioFeature(Double.valueOf(formateKey));
+        }
+    }
+
+    public void setRos(Object key){
+        if(key == null ){
+            this.ros = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.ros = bucketRatioFeature(Double.valueOf(formateKey));
+        }
+    }
+
+
+
+
+
+
+
+
+
+
+}

+ 88 - 0
src/main/java/com/tzld/piaoquan/data/base/UserBytesFeature.java

@@ -0,0 +1,88 @@
+package com.tzld.piaoquan.data.base;
+
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+import com.tzld.piaoquan.data.base.UserFeature;
+
+@Data
+public class UserBytesFeature {
+
+    private final byte[]  uid;
+
+    // 当天统计量信息
+    private Map<String, byte[]> day1_cnt_features;
+    // 3天内统计量
+    private Map<String, byte[]> day3_cnt_features;
+    // 7天内统计量
+    private Map<String, byte[]> day7_cnt_features;
+    // 3个月统计量
+    private Map<String, byte[]> month3_cnt_features;
+    // 用户行为周期
+    private final byte[]  user_cycle_bucket_7days;
+    private final byte[]  user_cycle_bucket_30days;
+    private final byte[]  user_share_bucket_30days;
+
+
+    public UserBytesFeature(UserFeature feature) {
+        this.uid = feature.getUid().getBytes();
+        this.user_cycle_bucket_7days = feature.getUser_cycle_bucket_7days().getBytes();
+        this.user_cycle_bucket_30days = feature.getUser_cycle_bucket_30days().getBytes();
+        this.user_share_bucket_30days = feature.getUser_share_bucket_30days().getBytes();
+
+        this.day1_cnt_features = new HashMap<String, byte[]>();
+        // 1 day statistic
+        this.day1_cnt_features.put("exp", String.valueOf(feature.getDay1_cnt_features().getExp_cnt()).getBytes());
+        this.day1_cnt_features.put("click", String.valueOf(feature.getDay1_cnt_features().getClick_cnt()).getBytes());
+        this.day1_cnt_features.put("share", String.valueOf(feature.getDay1_cnt_features().getShare_cnt()).getBytes());
+        this.day1_cnt_features.put("return", String.valueOf(feature.getDay1_cnt_features().getReturn_cnt()).getBytes());
+        this.day1_cnt_features.put("ctr", String.valueOf(feature.getDay1_cnt_features().getCtr()).getBytes());
+        this.day1_cnt_features.put("str", String.valueOf(feature.getDay1_cnt_features().getStr()).getBytes());
+        this.day1_cnt_features.put("rov", String.valueOf(feature.getDay1_cnt_features().getRov()).getBytes());
+        this.day1_cnt_features.put("ros", String.valueOf(feature.getDay1_cnt_features().getRos()).getBytes());
+
+
+
+        // 3 day statistic
+        this.day3_cnt_features = new HashMap<String, byte[]>();
+        day3_cnt_features.put("exp", String.valueOf(feature.getDay3_cnt_features().getExp_cnt()).getBytes());
+        day3_cnt_features.put("click", String.valueOf(feature.getDay3_cnt_features().getClick_cnt()).getBytes());
+        day3_cnt_features.put("share", String.valueOf(feature.getDay3_cnt_features().getShare_cnt()).getBytes());
+        day3_cnt_features.put("return", String.valueOf(feature.getDay3_cnt_features().getReturn_cnt()).getBytes());
+        day3_cnt_features.put("ctr", String.valueOf(feature.getDay3_cnt_features().getCtr()).getBytes());
+        day3_cnt_features.put("str", String.valueOf(feature.getDay3_cnt_features().getStr()).getBytes());
+        day3_cnt_features.put("rov", String.valueOf(feature.getDay3_cnt_features().getRov()).getBytes());
+        day3_cnt_features.put("ros", String.valueOf(feature.getDay3_cnt_features().getRos()).getBytes());
+
+
+        // 7 day statistic
+        this.day7_cnt_features = new HashMap<String, byte[]>();
+        day7_cnt_features.put("exp", String.valueOf(feature.getDay7_cnt_features().getExp_cnt()).getBytes());
+        day7_cnt_features.put("click", String.valueOf(feature.getDay7_cnt_features().getClick_cnt()).getBytes());
+        day7_cnt_features.put("share", String.valueOf(feature.getDay7_cnt_features().getShare_cnt()).getBytes());
+        day7_cnt_features.put("return", String.valueOf(feature.getDay7_cnt_features().getReturn_cnt()).getBytes());
+        day7_cnt_features.put("ctr", String.valueOf(feature.getDay7_cnt_features().getCtr()).getBytes());
+        day7_cnt_features.put("str", String.valueOf(feature.getDay7_cnt_features().getStr()).getBytes());
+        day7_cnt_features.put("rov", String.valueOf(feature.getDay7_cnt_features().getRov()).getBytes());
+        day7_cnt_features.put("ros", String.valueOf(feature.getDay7_cnt_features().getRos()).getBytes());
+
+
+
+        // 3 month statisic
+        this.month3_cnt_features = new HashMap<String, byte[]>();
+        month3_cnt_features.put("exp", String.valueOf(feature.getMonth3_cnt_features().getExp_cnt()).getBytes());
+        month3_cnt_features.put("click", String.valueOf(feature.getMonth3_cnt_features().getClick_cnt()).getBytes());
+        month3_cnt_features.put("share", String.valueOf(feature.getMonth3_cnt_features().getShare_cnt()).getBytes());
+        month3_cnt_features.put("return", String.valueOf(feature.getMonth3_cnt_features().getReturn_cnt()).getBytes());
+        month3_cnt_features.put("ctr", String.valueOf(feature.getMonth3_cnt_features().getCtr()).getBytes());
+        month3_cnt_features.put("str", String.valueOf(feature.getMonth3_cnt_features().getStr()).getBytes());
+        month3_cnt_features.put("rov", String.valueOf(feature.getMonth3_cnt_features().getRov()).getBytes());
+        month3_cnt_features.put("ros", String.valueOf(feature.getMonth3_cnt_features().getRos()).getBytes());
+
+
+    }
+
+
+
+}

+ 85 - 0
src/main/java/com/tzld/piaoquan/data/base/UserFeature.java

@@ -0,0 +1,85 @@
+package com.tzld.piaoquan.data.base;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+public class UserFeature {
+    private String uid;
+    // 当天统计量信息
+    private UserActionFeature day1_cnt_features;
+    // 3天内统计量
+    private UserActionFeature day3_cnt_features;
+    // 7天内统计量
+    private UserActionFeature day7_cnt_features;
+    // 3个月统计量
+    private UserActionFeature month3_cnt_features;
+    // 用户行为周期
+    private String user_cycle_bucket_7days;
+    private String user_cycle_bucket_30days;
+    private String user_share_bucket_30days;
+
+    public void setUid(String key){
+        this.uid = key;
+        if(key == null)
+            this.uid = "0";
+    }
+
+
+    public void setDay1_cnt_features(UserActionFeature key){
+        this.day1_cnt_features = key;
+        if(key == null)
+            this.day1_cnt_features = new UserActionFeature();
+    }
+
+    public void setDay3_cnt_features(UserActionFeature key){
+        this.day3_cnt_features = key;
+        if(key == null)
+            this.day3_cnt_features = new UserActionFeature();
+    }
+
+    public void setDay7_cnt_features(UserActionFeature key){
+        this.day7_cnt_features = key;
+        if(key == null)
+            this.day7_cnt_features = new UserActionFeature();
+    }
+
+    public void setMonth3_cnt_features(UserActionFeature key) {
+        this.month3_cnt_features = key;
+        if(key == null)
+            this.month3_cnt_features = new UserActionFeature();
+    }
+
+
+    public void setUser_cycle_bucket_7days(String key){
+        this.user_cycle_bucket_7days = key;
+        if(key == null)
+            this.user_cycle_bucket_7days = "0";
+    }
+
+    public void setUser_cycle_bucket_30days(String key){
+        this.user_cycle_bucket_30days = key;
+        if(key == null)
+            this.user_cycle_bucket_30days = "0";
+    }
+
+    public void setUser_share_bucket_30days(String key){
+        this.user_share_bucket_30days = key;
+        if(key == null)
+            this.user_share_bucket_30days = "0";
+    }
+
+
+    public String getKey() {
+        return this.uid;
+    }
+
+    public String getValue(){
+        Gson gson = new GsonBuilder().serializeSpecialFloatingPointValues().create();
+        return gson.toJson(this);
+    }
+
+}

+ 93 - 0
src/main/java/com/tzld/piaoquan/data/base/VideoBytesFeature.java

@@ -0,0 +1,93 @@
+package com.tzld.piaoquan.data.base;
+
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+@Data
+public class VideoBytesFeature {
+    private final byte[] videoId;
+
+    private final byte[] upId;
+
+    private final byte[] titleLength;
+
+    private final byte[] playLength;
+
+    private final byte[] totolTime;
+
+    private final byte[] daysSinceUpload;
+
+
+    // 当天统计量信息
+    private Map<String, byte[]> item_day1_cnt_features;
+    // 3天内统计量
+    private Map<String, byte[]> item_day3_cnt_features;
+    // 7天内统计量
+    private Map<String, byte[]> item_day7_cnt_features;
+    // 3个月统计量
+    private Map<String, byte[]> item_month3_cnt_features;
+
+    public VideoBytesFeature(ItemFeature feature) {
+        videoId  = feature.getVideoId().getBytes();
+        upId  = feature.getUpId().getBytes();
+
+        titleLength  = feature.getTitleLength().getBytes();
+        playLength  = feature.getPlayLength().getBytes();
+        totolTime  = feature.getTotalTime().getBytes();
+        daysSinceUpload  = feature.getDaysSinceUpload().getBytes();
+        // 1day
+
+
+        // 1 day statistic
+        item_day1_cnt_features = new HashMap<String, byte[]>();
+        item_day1_cnt_features.put("exp", String.valueOf(feature.getDay1_cnt_features().getExp_cnt()).getBytes());
+        item_day1_cnt_features.put("click", String.valueOf(feature.getDay1_cnt_features().getClick_cnt()).getBytes());
+        item_day1_cnt_features.put("share", String.valueOf(feature.getDay1_cnt_features().getShare_cnt()).getBytes());
+        item_day1_cnt_features.put("return", String.valueOf(feature.getDay1_cnt_features().getReturn_cnt()).getBytes());
+        item_day1_cnt_features.put("ctr", String.valueOf(feature.getDay1_cnt_features().getCtr()).getBytes());
+        item_day1_cnt_features.put("str", String.valueOf(feature.getDay1_cnt_features().getStr()).getBytes());
+        item_day1_cnt_features.put("rov", String.valueOf(feature.getDay1_cnt_features().getRov()).getBytes());
+        item_day1_cnt_features.put("ros", String.valueOf(feature.getDay1_cnt_features().getRos()).getBytes());
+
+
+
+        // 3 day statistic
+        item_day3_cnt_features = new HashMap<String, byte[]>();
+        item_day3_cnt_features.put("exp", String.valueOf(feature.getDay3_cnt_features().getExp_cnt()).getBytes());
+        item_day3_cnt_features.put("click", String.valueOf(feature.getDay3_cnt_features().getClick_cnt()).getBytes());
+        item_day3_cnt_features.put("share", String.valueOf(feature.getDay3_cnt_features().getShare_cnt()).getBytes());
+        item_day3_cnt_features.put("return", String.valueOf(feature.getDay3_cnt_features().getReturn_cnt()).getBytes());
+        item_day3_cnt_features.put("ctr", String.valueOf(feature.getDay3_cnt_features().getCtr()).getBytes());
+        item_day3_cnt_features.put("str", String.valueOf(feature.getDay3_cnt_features().getStr()).getBytes());
+        item_day3_cnt_features.put("rov", String.valueOf(feature.getDay3_cnt_features().getRov()).getBytes());
+        item_day3_cnt_features.put("ros", String.valueOf(feature.getDay3_cnt_features().getRos()).getBytes());
+
+
+        // 7 day statistic
+        item_day7_cnt_features = new HashMap<String, byte[]>();
+        item_day7_cnt_features.put("exp", String.valueOf(feature.getDay7_cnt_features().getExp_cnt()).getBytes());
+        item_day7_cnt_features.put("click", String.valueOf(feature.getDay7_cnt_features().getClick_cnt()).getBytes());
+        item_day7_cnt_features.put("share", String.valueOf(feature.getDay7_cnt_features().getShare_cnt()).getBytes());
+        item_day7_cnt_features.put("return", String.valueOf(feature.getDay7_cnt_features().getReturn_cnt()).getBytes());
+        item_day7_cnt_features.put("ctr", String.valueOf(feature.getDay7_cnt_features().getCtr()).getBytes());
+        item_day7_cnt_features.put("str", String.valueOf(feature.getDay7_cnt_features().getStr()).getBytes());
+        item_day7_cnt_features.put("rov", String.valueOf(feature.getDay7_cnt_features().getRov()).getBytes());
+        item_day7_cnt_features.put("ros", String.valueOf(feature.getDay7_cnt_features().getRos()).getBytes());
+
+
+
+        // 3 month statisic
+        item_month3_cnt_features = new HashMap<String, byte[]>();
+        item_month3_cnt_features.put("exp", String.valueOf(feature.getMonth3_cnt_features().getExp_cnt()).getBytes());
+        item_month3_cnt_features.put("click", String.valueOf(feature.getMonth3_cnt_features().getClick_cnt()).getBytes());
+        item_month3_cnt_features.put("share", String.valueOf(feature.getMonth3_cnt_features().getShare_cnt()).getBytes());
+        item_month3_cnt_features.put("return", String.valueOf(feature.getMonth3_cnt_features().getReturn_cnt()).getBytes());
+        item_month3_cnt_features.put("ctr", String.valueOf(feature.getMonth3_cnt_features().getCtr()).getBytes());
+        item_month3_cnt_features.put("str", String.valueOf(feature.getMonth3_cnt_features().getStr()).getBytes());
+        item_month3_cnt_features.put("rov", String.valueOf(feature.getMonth3_cnt_features().getRov()).getBytes());
+        item_month3_cnt_features.put("ros", String.valueOf(feature.getMonth3_cnt_features().getRos()).getBytes());
+
+    }
+
+}

+ 131 - 0
src/main/java/com/tzld/piaoquan/data/base/VlogFeatureGroup.java

@@ -0,0 +1,131 @@
+package com.tzld.piaoquan.data.base;
+
+public enum VlogFeatureGroup {
+
+    // video
+    APPTYP,
+    VIDEOID,
+    MID,
+    UID,
+    MACHINEINFO_BRAND,
+    MACHINEINFO_MODEL,
+    MACHINEINFO_PLATFORM,
+    MACHINEINFO_SDKVERSION,
+    MACHINEINFO_SYSTEM,
+    MACHINEINFO_WECHATVERSION,
+    UP_ID,
+    TITLE_LEN,
+    PLAY_LEN,
+    TOTAL_TIME,
+    DAYS_SINCE_UPLOAD,
+    DAY,
+    WEEK,
+    HOUR,
+    REGION,
+    CITY,
+
+    USER_1DAY_EXP,
+    USER_1DAY_CLICK,
+    USER_1DAY_SHARE,
+    USER_1DAY_RETURN,
+    USER_1DAY_CTR,
+    USER_1DAY_STR,
+    USER_1DAY_ROV,
+    USER_1DAY_ROS,
+
+    USER_3DAY_EXP,
+    USER_3DAY_CLICK,
+    USER_3DAY_SHARE,
+    USER_3DAY_RETURN,
+    USER_3DAY_CTR,
+    USER_3DAY_STR,
+    USER_3DAY_ROV,
+    USER_3DAY_ROS,
+
+    USER_7DAY_EXP,
+    USER_7DAY_CLICK,
+    USER_7DAY_SHARE,
+    USER_7DAY_RETURN,
+    USER_7DAY_CTR,
+    USER_7DAY_STR,
+    USER_7DAY_ROV,
+    USER_7DAY_ROS,
+
+    USER_3MONTH_EXP,
+    USER_3MONTH_CLICK,
+    USER_3MONTH_SHARE,
+    USER_3MONTH_RETURN,
+    USER_3MONTH_CTR,
+    USER_3MONTH_STR,
+    USER_3MONTH_ROV,
+    USER_3MONTH_ROS,
+
+
+    ITEM_1DAY_EXP,
+    ITEM_1DAY_CLICK,
+    ITEM_1DAY_SHARE,
+    ITEM_1DAY_RETURN,
+    ITEM_1DAY_CTR,
+    ITEM_1DAY_STR,
+    ITEM_1DAY_ROV,
+    ITEM_1DAY_ROS,
+
+    ITEM_3DAY_EXP,
+    ITEM_3DAY_CLICK,
+    ITEM_3DAY_SHARE,
+    ITEM_3DAY_RETURN,
+    ITEM_3DAY_CTR,
+    ITEM_3DAY_STR,
+    ITEM_3DAY_ROV,
+    ITEM_3DAY_ROS,
+
+    ITEM_7DAY_EXP,
+    ITEM_7DAY_CLICK,
+    ITEM_7DAY_SHARE,
+    ITEM_7DAY_RETURN,
+    ITEM_7DAY_CTR,
+    ITEM_7DAY_STR,
+    ITEM_7DAY_ROV,
+    ITEM_7DAY_ROS,
+
+    ITEM_3MONTH_EXP,
+    ITEM_3MONTH_CLICK,
+    ITEM_3MONTH_SHARE,
+    ITEM_3MONTH_RETURN,
+    ITEM_3MONTH_CTR,
+    ITEM_3MONTH_STR,
+    ITEM_3MONTH_ROV,
+    ITEM_3MONTH_ROS,
+
+
+    USER_CYCLE_BUCKET_7DAY,
+    USER_CYCLE_BUCKET_30DAY,
+    USER_SHARE_BUCKET_30DAY,
+    ;
+
+
+    private final byte[] idBytes;
+    private final byte[] nameBytes;
+
+    VlogFeatureGroup() {
+        this.nameBytes = name().toLowerCase().getBytes();
+        this.idBytes = String.valueOf(ordinal()).getBytes();
+    }
+
+    public final int getId() {
+        return ordinal();
+    }
+
+    public final String getGroupName() {
+        return name().toLowerCase();
+    }
+
+    public final byte[] getGroupNameBytes() {
+        return getGroupName().getBytes();
+    }
+
+    public final byte[] getIdBytes() {
+        return idBytes;
+    }
+
+}

+ 165 - 0
src/main/java/com/tzld/piaoquan/data/dataloader/FeatureConstructor.java

@@ -0,0 +1,165 @@
+package com.tzld.piaoquan.data.dataloader;
+
+
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.tzld.piaoquan.data.base.ItemFeature;
+import com.tzld.piaoquan.data.base.RequestContext;
+import com.tzld.piaoquan.data.base.UserActionFeature;
+import com.tzld.piaoquan.data.base.UserFeature;
+import org.apache.spark.sql.Row;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FeatureConstructor {
+
+    private static final String BUCKET_NAME = "ali-recommend";
+    private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();
+
+    static {
+        ODPS_CONFIG.put("ENDPOINT", "http://service.cn.maxcompute.aliyun.com/api");
+        ODPS_CONFIG.put("ACCESSID", "LTAIWYUujJAm7CbH");
+        ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
+    }
+
+    ;
+
+    private static final Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
+
+
+    public static RequestContext constructRequestContext(Row record) {
+        RequestContext requestContext = new RequestContext();
+        requestContext.setApptype(record.getAs("apptype"));
+        requestContext.setMachineinfo_brand(record.getAs("machineinfo_brand"));
+        requestContext.setMachineinfo_model(record.getAs("machineinfo_model"));
+        requestContext.setMachineinfo_platform(record.getAs("machineinfo_platform"));
+        requestContext.setMachineinfo_sdkversion(record.getAs("machineinfo_sdkversion"));
+        requestContext.setMachineinfo_system(record.getAs("machineinfo_system"));
+        requestContext.setMachineinfo_wechatversion(record.getAs("machineinfo_wechatversion"));
+        requestContext.setDay(record.getAs("ctx_day"));
+        requestContext.setWeek(record.getAs("ctx_week"));
+        requestContext.setHour(record.getAs("ctx_hour"));
+        requestContext.setRegion(record.getAs("ctx_region"));
+        requestContext.setCity(record.getAs("ctx_city"));
+        return requestContext;
+    }
+
+
+    public static UserFeature constructUserFeature(Row record) {
+        UserFeature userFeature = new UserFeature();
+        userFeature.setUid(record.getAs("uid").toString());
+        userFeature.setUser_cycle_bucket_7days(record.getAs("u_cycle_bucket_7days"));
+        userFeature.setUser_cycle_bucket_30days(record.getAs("u_cycle_bucket_30days"));
+        userFeature.setUser_share_bucket_30days(record.getAs("u_share_bucket_30days"));
+
+
+        // 1day features
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getAs("u_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getAs("u_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getAs("u_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getAs("u_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getAs("u_ctr_1day"));
+        user1dayActionFeature.setStr(record.getAs("u_str_1day"));
+        user1dayActionFeature.setRov(record.getAs("u_rov_1day"));
+        user1dayActionFeature.setRos(record.getAs("u_ros_1day"));
+        userFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        // 3day features
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getAs("u_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getAs("u_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getAs("u_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getAs("u_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getAs("u_ctr_3day"));
+        user3dayActionFeature.setStr(record.getAs("u_str_3day"));
+        user3dayActionFeature.setRov(record.getAs("u_rov_3day"));
+        user3dayActionFeature.setRos(record.getAs("u_ros_3day"));
+        userFeature.setDay3_cnt_features(user3dayActionFeature);
+
+        // 7day features
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getAs("u_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getAs("u_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getAs("u_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getAs("u_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getAs("u_ctr_7day"));
+        user7dayActionFeature.setStr(record.getAs("u_str_7day"));
+        user7dayActionFeature.setRov(record.getAs("u_rov_7day"));
+        user7dayActionFeature.setRos(record.getAs("u_ros_7day"));
+        userFeature.setDay7_cnt_features(user7dayActionFeature);
+
+        // 3month features
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getAs("u_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getAs("u_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getAs("u_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getAs("u_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getAs("u_ctr_3month"));
+        user3monthActionFeature.setStr(record.getAs("u_str_3month"));
+        user3monthActionFeature.setRov(record.getAs("u_rov_3month"));
+        user3monthActionFeature.setRos(record.getAs("u_ros_3month"));
+        userFeature.setMonth3_cnt_features(user3monthActionFeature);
+
+        return userFeature;
+    }
+
+
+    public static ItemFeature constructItemFeature(Row record) {
+        ItemFeature itemFeature = new ItemFeature();
+        itemFeature.setVideoId(record.getAs("videoid").toString());
+        itemFeature.setUpId(record.getAs("uid").toString());
+        itemFeature.setTitleLength(record.getAs("play_count").toString());
+        itemFeature.setPlayLength(record.getAs("total_time").toString());
+        itemFeature.setTotalTime(record.getAs("total_time").toString());
+        itemFeature.setDaysSinceUpload(record.getAs("existence_days").toString());
+
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getAs("i_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getAs("i_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getAs("i_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getAs("i_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getAs("i_ctr_1day"));
+        user1dayActionFeature.setStr(record.getAs("i_str_1day"));
+        user1dayActionFeature.setRov(record.getAs("i_rov_1day"));
+        user1dayActionFeature.setRos(record.getAs("i_ros_1day"));
+        itemFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getAs("i_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getAs("i_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getAs("i_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getAs("i_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getAs("i_ctr_3day"));
+        user3dayActionFeature.setStr(record.getAs("i_str_3day"));
+        user3dayActionFeature.setRov(record.getAs("i_rov_3day"));
+        user3dayActionFeature.setRos(record.getAs("i_ros_3day"));
+        itemFeature.setDay3_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getAs("i_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getAs("i_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getAs("i_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getAs("i_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getAs("i_ctr_7day"));
+        user7dayActionFeature.setStr(record.getAs("i_str_7day"));
+        user7dayActionFeature.setRov(record.getAs("i_rov_7day"));
+        user7dayActionFeature.setRos(record.getAs("i_ros_7day"));
+        itemFeature.setDay7_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getAs("i_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getAs("i_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getAs("i_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getAs("i_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getAs("i_ctr_3month"));
+        user3monthActionFeature.setStr(record.getAs("i_str_3month"));
+        user3monthActionFeature.setRov(record.getAs("i_rov_3month"));
+        user3monthActionFeature.setRos(record.getAs("i_ros_3month"));
+        itemFeature.setMonth3_cnt_features(user3monthActionFeature);
+        return itemFeature;
+    }
+
+
+}

+ 78 - 0
src/main/java/com/tzld/piaoquan/data/dataloader/UserFeatureSparkLoaderMaxcompute.java

@@ -0,0 +1,78 @@
+package com.tzld.piaoquan.data.dataloader;
+
+
+import com.tzld.piaoquan.data.base.UserFeature;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class UserFeatureSparkLoaderMaxcompute {
+
+    private static final String userKeyFormat = "user:%s";
+
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+
+    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Row line) {
+        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
+        UserFeature userFeature = FeatureConstructor.constructUserFeature(line);
+        String key = String.format(userKeyFormat, userFeature.getKey());
+        String value = userFeature.getValue();
+        userFeaRedisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+    }
+
+
+    public static Dataset<Row> readTable(SparkSession spark,
+                                         String ptTableName,
+                                         String dt) {
+        // 读 分区表
+        Dataset<Row> rptdf = spark.sql("select * from " + ptTableName + " where dt = " + dt + " limit 1000;");
+        System.out.println("rptdf count: " + rptdf.count());
+        rptdf.printSchema();
+        return rptdf;
+    }
+
+
+    public static void main(String[] args) {
+        SparkSession spark = SparkSession
+                .builder()
+                .appName("SparkSQL-on-MaxCompute")
+                .config("spark.sql.broadcastTimeout", 20 * 60)
+                .config("spark.sql.crossJoin.enabled", true)
+                .config("odps.exec.dynamic.partition.mode", "nonstrict")
+                .getOrCreate();
+        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
+
+        Dataset<Row> rptdf = readTable(spark, "loghubods.alg_recsys_video_info", "20231207");
+
+        rptdf.toJavaRDD().foreachPartition(
+                rowIterator -> {
+                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+                }
+        );
+    }
+
+}

+ 35 - 0
src/main/java/com/tzld/piaoquan/data/score/feature/BytesGroup.java

@@ -0,0 +1,35 @@
+package com.tzld.piaoquan.data.score.feature;
+
+
+public class BytesGroup {
+    private int id;
+    private String name;
+    private byte[] nameBytes;
+    private byte[] buffer;
+
+    public BytesGroup(int id, String name, byte[] nameBytes) {
+        this.id = id;
+        this.name = name;
+        this.nameBytes = nameBytes;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public byte[] getNameBytes() {
+        return nameBytes;
+    }
+
+    public byte[] getBuffer() {
+        return buffer;
+    }
+
+    public void setBuffer(byte[] buffer) {
+        this.buffer = buffer;
+    }
+}

+ 192 - 0
src/main/java/com/tzld/piaoquan/data/score/feature/BytesUtils.java

@@ -0,0 +1,192 @@
+package com.tzld.piaoquan.data.score.feature;
+
+
+import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Extract features from user, item & context info. Returns 64-bit murmurhash of feature string as results.
+ */
+public class BytesUtils {
+    private static final byte[] SEPARATOR = "_".getBytes();
+    private static final byte[] FEATURE_SEPARATOR = "#".getBytes();
+    private static final int MAX_FEATURE_BYTES_LENGTH = 512;
+    private static final long SEED = 11L;
+    private BytesGroup[] groups;
+
+    /**
+     * 一个种特殊的List,在尝试写入null的时候回默默地扔掉.
+     * @param <E> List的元素类型.
+     */
+    public static class NullRejectingArrayList<E> extends ArrayList<E> {
+        public NullRejectingArrayList(int capacity) {
+            super(capacity);
+        }
+
+        public NullRejectingArrayList() {
+            super();
+        }
+
+        @Override
+        public boolean add(E e) {
+            return e != null && super.add(e);
+        }
+    }
+
+    public BytesUtils(BytesGroup[] groups) {
+        this.groups = groups;
+        for (BytesGroup g : groups) {
+            byte[] buffer = prepareBuffer(g.getName(), g.getNameBytes());
+            groups[g.getId()].setBuffer(buffer);
+        }
+    }
+
+    public byte[] prepareBuffer(String name, byte[] nameBytes) {
+
+        byte[] buffer = new byte[MAX_FEATURE_BYTES_LENGTH];
+        System.arraycopy(nameBytes, 0, buffer, 0, nameBytes.length);
+        System.arraycopy(FEATURE_SEPARATOR, 0, buffer, nameBytes.length, 1);
+        return buffer;
+    }
+
+    public BaseFeature baseFea(byte[] buffer, int length) {
+        long hash = FeatureHash.MurmurHash64(buffer, 0, length, SEED);
+
+        // debug中查看 String fea = new String(buffer, 0, length);
+        // 初始化protobuf并赋值
+        BaseFeature.Builder tmp = BaseFeature.newBuilder();
+        tmp.setIdentifier(hash);
+        return tmp.build();
+    }
+
+    public BaseFeature makeFea(int id, byte[] value) {
+        byte[] buffer = groups[id].getBuffer();
+        if (buffer == null || value == null) {
+            return null;
+        }
+
+        final int nameLength = groups[id].getNameBytes().length + 1;
+        final int length = nameLength + value.length;
+        System.arraycopy(value, 0, buffer, nameLength, value.length);
+        return baseFea(buffer, length);
+    }
+
+    public BaseFeature makeFea(int id, final byte[] p1, final byte[] p2) {
+        byte[] buffer = groups[id].getBuffer();
+        if (buffer == null || p1 == null || p2 == null) {
+            return null;
+        }
+
+        final int nameLength = groups[id].getNameBytes().length + 1;
+        final int length = nameLength + p1.length + 1 + p2.length;
+
+        System.arraycopy(p1, 0, buffer, nameLength, p1.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length, 1);
+        System.arraycopy(p2, 0, buffer, nameLength + p1.length + 1, p2.length);
+        return baseFea(buffer, length);
+    }
+
+    public BaseFeature makeFea(int id, final byte[] p1, final byte[] p2, final byte[] p3) {
+        byte[] buffer = groups[id].getBuffer();
+        if (buffer == null || p1 == null || p2 == null || p3 == null) {
+            return null;
+        }
+
+        final int nameLength = groups[id].getNameBytes().length + 1;
+        final int length = nameLength + p1.length + 1 + p2.length + 1 + p3.length;
+        System.arraycopy(p1, 0, buffer, nameLength, p1.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length, 1);
+        System.arraycopy(p2, 0, buffer, nameLength + p1.length + 1, p2.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length + 1 + p2.length, 1);
+        System.arraycopy(p3, 0, buffer, nameLength + p1.length + 1 + p2.length + 1, p3.length);
+
+        return baseFea(buffer, length);
+    }
+
+    public BaseFeature makeFea(int id, final byte[] p1, final byte[] p2, final byte[] p3, final byte[] p4) {
+        byte[] buffer = groups[id].getBuffer();
+        if (buffer == null || p1 == null || p2 == null || p3 == null || p4 == null) {
+            return null;
+        }
+
+        final int nameLength = groups[id].getNameBytes().length + 1;
+        final int length = nameLength + p1.length + 1 + p2.length + 1 + p3.length + 1 + p4.length;
+        System.arraycopy(p1, 0, buffer, nameLength, p1.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length, 1);
+        System.arraycopy(p2, 0, buffer, nameLength + p1.length + 1, p2.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length + 1 + p2.length, 1);
+        System.arraycopy(p3, 0, buffer, nameLength + p1.length + 1 + p2.length + 1, p3.length);
+        System.arraycopy(SEPARATOR, 0, buffer, nameLength + p1.length + 1 + p2.length + 1 + p3.length, 1);
+        System.arraycopy(p4, 0, buffer, nameLength + p1.length + 1 + p2.length + 1 + p3.length + 1, p4.length);
+
+        return baseFea(buffer, length);
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[][] list) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(list.length);
+        for (byte[] t: list) {
+            result.add(makeFea(id, t));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[][] left, byte[] right) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(left.length);
+        for (byte[] l: left) {
+            result.add(makeFea(id, l, right));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[][] left, byte[] right1, byte[] right2) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(left.length);
+        for (byte[] l: left) {
+            result.add(makeFea(id, l, right1, right2));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[][] left, byte[] right1, byte[] right2, byte[] right3) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(left.length);
+        for (byte[] l: left) {
+            result.add(makeFea(id, l, right1, right2, right3));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[] left, byte[][] right) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(right.length);
+        for (byte[] r : right) {
+            result.add(makeFea(id, left, r));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[] left1, byte[] left2, byte[][] right) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(right.length);
+        for (byte[] r : right) {
+            result.add(makeFea(id, left1, left2, r));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[] left1, byte[] left2, byte[] left3, byte[][] right) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(right.length);
+        for (byte[] r : right) {
+            result.add(makeFea(id, left1, left2, left3, r));
+        }
+        return result;
+    }
+
+    public List<BaseFeature> makeFea(int id, byte[][] left, byte[][] right) {
+        List<BaseFeature> result = new NullRejectingArrayList<BaseFeature>(left.length * right.length);
+        for (byte[] l: left) {
+            for (byte[] r: right) {
+                result.add(makeFea(id, l, r));
+            }
+        }
+        return result;
+    }
+}

+ 230 - 0
src/main/java/com/tzld/piaoquan/data/score/feature/FeatureHash.java

@@ -0,0 +1,230 @@
+package com.tzld.piaoquan.data.score.feature;
+
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+public class FeatureHash {
+    public static Charset CharSetUTF8 = Charset.forName("UTF-8");
+
+    public static long getUInt32(byte a, byte b, byte c, byte d) {
+        return (d << 24 | (c & 0xFF) << 16 | (b & 0xFF) << 8 | (a & 0xFF));
+    }
+
+    public static long hash64(byte[] data) {
+        return MurmurHash64A(ByteBuffer.wrap(data), 0, data.length, 11L);
+    }
+
+    public static long MurmurHash64A(ByteBuffer buffer, int from, int len, long seed) {
+        final long m = 0xc6a4a7935bd1e995L;
+        final int r = 47;
+
+        long h = (seed) ^ (len * m);
+        int longLength = len / 8;
+
+        for (int i = 0; i < longLength; ++i) {
+            final int bytePos = from + i * 8;
+            long k = buffer.getLong(bytePos);
+
+            k *= m;
+            k ^= k >> r;
+            k *= m;
+            h ^= k;
+            h *= m;
+        }
+
+        final int remainingPos = len & ~7;
+        switch (len % 8) {
+            case 7: h ^= (long)(buffer.get(remainingPos + 6) & 0xFF) << 48;
+            case 6: h ^= (long)(buffer.get(remainingPos + 5) & 0xFF) << 40;
+            case 5: h ^= (long)(buffer.get(remainingPos + 4) & 0xFF) << 32;
+            case 4: h ^= (long)(buffer.get(remainingPos + 3) & 0xFF) << 24;
+            case 3: h ^= (long)(buffer.get(remainingPos + 2) & 0xFF) << 16;
+            case 2: h ^= (long)(buffer.get(remainingPos + 1) & 0xFF) << 8;
+            case 1:
+                h ^= (long)(buffer.get(remainingPos) & 0xFF);
+                h *= m;
+        }
+
+        h ^= h >>> r;
+        h *= m;
+        h ^= h >>> r;
+        return h;
+    }
+
+    public static long MurmurHash32(byte data[], int len, long seed) {
+        long m = 0x5bd1e995L;
+        int r = 24;
+
+        long h = seed ^ len;
+
+        int offset = 0;
+        while (len >= 4) {
+            long k = getUInt32(data[offset], data[offset + 1], data[offset + 2], data[offset + 3]);
+
+            k *= m;
+            k &= 0xFFFFFFFFL;
+            k ^= k >> r;
+            k *= m;
+            k &= 0xFFFFFFFFL;
+
+            h *= m;
+            h &= 0xFFFFFFFFL;
+            h ^= k;
+
+            offset += 4;
+            len -= 4;
+        }
+
+        // Handle the last few bytes of the input array
+        switch (len) {
+            case 3: h ^= data[offset + 2] << 16;
+            case 2: h ^= data[offset + 1] << 8;
+            case 1: h ^= data[offset];
+                h *= m;
+                h &= 0xFFFFFFFFL;
+        } ;
+
+        // Do a few final mixes of the hash to ensure the last few
+        // bytes are well-incorporated.
+
+        h ^= h >> 13;
+        h *= m;
+        h &= 0xFFFFFFFFL;
+        h ^= h >> 15;
+
+        return h;
+    }
+
+    // 64-bit hash for 32-bit platforms
+    public static long MurmurHash64(byte[] buffer, int start, int len, long seed) {
+        final long m = 0x5bd1e995L;
+        final int r = 24;
+        final int original = len;
+
+        long h1 = (seed) ^ len;
+        long h2 = (seed >> 32);
+
+        int offset = start;
+        while (len >= 8) {
+            long k1 = getUInt32(buffer[offset], buffer[offset + 1], buffer[offset + 2], buffer[offset + 3]);
+            // long k1 = buffer.getInt(offset);
+
+            k1 *= m; k1 &= 0xFFFFFFFFL; k1 ^= k1 >> r; k1 *= m; k1 &= 0xFFFFFFFFL;
+            h1 *= m; h1 &= 0xFFFFFFFFL; h1 ^= k1;
+            offset += 4;
+
+            long k2 = getUInt32(buffer[offset], buffer[offset + 1], buffer[offset + 2], buffer[offset + 3]);
+            // long k2 = buffer.getInt(offset);
+            k2 *= m; k2 &= 0xFFFFFFFFL; k2 ^= k2 >> r; k2 *= m; k2 &= 0xFFFFFFFFL;
+            h2 *= m; h2 &= 0xFFFFFFFFL; h2 ^= k2;
+
+            offset += 4;
+            len -= 8;
+        }
+
+        if (len >= 4) {
+            long k1 = getUInt32(buffer[offset], buffer[offset + 1], buffer[offset + 2], buffer[offset + 3]);
+            // long k1 = buffer.getInt(offset);
+            k1 *= m; k1 &= 0xFFFFFFFFL; k1 ^= k1 >> r; k1 *= m; k1 &= 0xFFFFFFFFL;
+            h1 *= m; h1 &= 0xFFFFFFFFL; h1 ^= k1;
+            offset += 4;
+            len -= 4;
+        }
+
+        switch (len) {
+            case 3: h2 ^= (buffer[offset + 2] & 0xFF) << 16;
+            case 2: h2 ^= (buffer[offset + 1] & 0xFF) << 8;
+            case 1: h2 ^= (buffer[offset] & 0xFF);
+                h2 *= m;
+                h2 &= 0xFFFFFFFFL;
+        } ;
+
+        h1 ^= h2 >> 18;
+        h1 *= m; h1 &= 0xFFFFFFFFL;
+        h2 ^= h1 >> 22;
+        h2 *= m; h2 &= 0xFFFFFFFFL;
+        h1 ^= h2 >> 17;
+        h1 *= m; h1 &= 0xFFFFFFFFL;
+        h2 ^= h1 >> 19;
+        h2 *= m; h2 &= 0xFFFFFFFFL;
+
+        /*BigInteger ans = BigInteger.valueOf(h1).shiftLeft(32).or(BigInteger.valueOf(h2));
+        return ans.longValue();*/
+        //System.err.println("feature: " + new String(buffer, 0, original) + " length: " + original + " hash: " + (h1 << 32 | h2) + " daze");
+        return h1 << 32 | h2;
+    }
+
+    // 64-bit hash for 32-bit platforms
+    public static BigInteger MurmurHash64(byte data[], int len, long seed) {
+        long m = 0x5bd1e995L;
+        int r = 24;
+
+        long h1 = (seed) ^ len;
+        long h2 = (seed >> 32);
+
+        int offset = 0;
+        while (len >= 8) {
+            long k1 = getUInt32(data[offset], data[offset + 1], data[offset + 2], data[offset + 3]);
+            k1 *= m; k1 &= 0xFFFFFFFFL; k1 ^= k1 >> r; k1 *= m; k1 &= 0xFFFFFFFFL;
+            h1 *= m; h1 &= 0xFFFFFFFFL; h1 ^= k1;
+
+            long k2 = getUInt32(data[offset + 4], data[offset + 5], data[offset + 6], data[offset + 7]);
+            k2 *= m; k2 &= 0xFFFFFFFFL; k2 ^= k2 >> r; k2 *= m; k2 &= 0xFFFFFFFFL;
+            h2 *= m; h2 &= 0xFFFFFFFFL; h2 ^= k2;
+
+            offset += 8;
+            len -= 8;
+        }
+
+        if (len >= 4) {
+            long k1 = getUInt32(data[offset], data[offset + 1], data[offset + 2], data[offset + 3]);
+            k1 *= m; k1 &= 0xFFFFFFFFL; k1 ^= k1 >> r; k1 *= m; k1 &= 0xFFFFFFFFL;
+            h1 *= m; h1 &= 0xFFFFFFFFL; h1 ^= k1;
+            offset += 4;
+            len -= 4;
+        }
+
+        switch (len) {
+            case 3: h2 ^= (data[offset + 2] & 0xFF) << 16;
+            case 2: h2 ^= (data[offset + 1] & 0xFF) << 8;
+            case 1: h2 ^= (data[offset] & 0xFF);
+                h2 *= m;
+                h2 &= 0xFFFFFFFFL;
+        } ;
+
+        h1 ^= h2 >> 18;
+        h1 *= m; h1 &= 0xFFFFFFFFL;
+        h2 ^= h1 >> 22;
+        h2 *= m; h2 &= 0xFFFFFFFFL;
+        h1 ^= h2 >> 17;
+        h1 *= m; h1 &= 0xFFFFFFFFL;
+        h2 ^= h1 >> 19;
+        h2 *= m; h2 &= 0xFFFFFFFFL;
+
+        BigInteger ans = BigInteger.valueOf(h1).shiftLeft(32).or(BigInteger.valueOf(h2));
+        return ans;
+    }
+
+    public static String hash(String input) {
+        byte[] tt = input.getBytes(CharSetUTF8);
+        return MurmurHash64(tt, tt.length, 11L).toString();
+    }
+
+    public static Long hashToLong(String input) {
+        byte[] tt = input.getBytes(CharSetUTF8);
+        return MurmurHash64(tt, tt.length, 11L).longValue();
+    }
+
+    /** the constant 2^64 */
+    private static final BigInteger TWO_64 = BigInteger.ONE.shiftLeft(64);
+
+    public static String asUnsignedLongString(long l) {
+        BigInteger b = BigInteger.valueOf(l);
+        if (b.signum() < 0) {
+            b = b.add(TWO_64);
+        }
+        return b.toString();
+    }
+}

+ 67 - 0
src/main/java/com/tzld/piaoquan/data/score/feature/LRBytesFeatureExtractorBase.java

@@ -0,0 +1,67 @@
+package com.tzld.piaoquan.data.score.feature;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.tzld.piaoquan.data.base.RequestContextBytesFeature;
+import com.tzld.piaoquan.data.base.UserBytesFeature;
+import com.tzld.piaoquan.data.base.VideoBytesFeature;
+import com.tzld.piaoquan.data.base.VlogFeatureGroup;
+import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
+import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
+import com.tzld.piaoquan.recommend.server.gen.recommend.LRSamples;
+
+import java.util.List;
+
+
+public abstract class LRBytesFeatureExtractorBase {
+    private static final double DEFAULT_USER_CTR_GROUP = 10.0;
+    private static final double DEFAULT_ARTICLE_CTR_GROUP = 100.0;
+
+
+    private BytesUtils utils;
+    //Feature Group & Features
+    ListMultimap<FeatureGroup, BaseFeature> features = ArrayListMultimap.create();
+    int groupCount;
+
+
+    LRBytesFeatureExtractorBase() {
+        groupCount = VlogFeatureGroup.values().length;
+        BytesGroup[] groups = new BytesGroup[groupCount];
+        for (VlogFeatureGroup g: VlogFeatureGroup.values()) {
+            groups[g.ordinal()] = new BytesGroup(g.ordinal(),
+                    g.getGroupName(), g.getGroupNameBytes());
+        }
+        utils = new BytesUtils(groups);
+    }
+
+    private FeatureGroup makeGroup(VlogFeatureGroup group){
+        FeatureGroup.Builder g = FeatureGroup.newBuilder();
+        g.setType("1");
+        g.setName(group.getGroupName());
+        g.setId(group.ordinal());
+        return g.build();
+    };
+
+
+    void makeFea(VlogFeatureGroup group, byte[] value) {
+        FeatureGroup featureGroup = makeGroup(group);
+        BaseFeature feature = utils.makeFea(group.ordinal(), value);
+        features.put(featureGroup, feature);
+    }
+
+    void makeFea(VlogFeatureGroup group, byte[][] list) {
+        FeatureGroup g = makeGroup(group);
+        List<BaseFeature> featureList = utils.makeFea(group.ordinal(), list);
+        features.putAll(g, featureList);
+    }
+
+    public ListMultimap<FeatureGroup, BaseFeature> getFeatures() {
+        return features;
+    }
+
+    public abstract LRSamples single(UserBytesFeature userBytesFeature,
+                                     VideoBytesFeature videoBytesFeature,
+                                     RequestContextBytesFeature requestContextBytesFeature);
+
+
+}

+ 152 - 0
src/main/java/com/tzld/piaoquan/data/score/feature/VlogShareLRFeatureExtractor.java

@@ -0,0 +1,152 @@
+package com.tzld.piaoquan.data.score.feature;
+
+import com.tzld.piaoquan.data.base.*;
+import com.tzld.piaoquan.recommend.server.gen.recommend.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VlogShareLRFeatureExtractor extends LRBytesFeatureExtractorBase {
+
+    public VlogShareLRFeatureExtractor() {
+        super();
+    }
+
+    // TODO
+    // 补充待抽取的context feature
+    public void getContextFeatures(RequestContextBytesFeature requestContextBytes) {
+        makeFea(VlogFeatureGroup.MACHINEINFO_BRAND, requestContextBytes.getMachineinfo_brand());
+        makeFea(VlogFeatureGroup.MACHINEINFO_MODEL, requestContextBytes.getMachineinfo_model());
+        makeFea(VlogFeatureGroup.MACHINEINFO_PLATFORM, requestContextBytes.getMachineinfo_platform());
+        makeFea(VlogFeatureGroup.MACHINEINFO_SDKVERSION, requestContextBytes.getMachineinfo_sdkversion());
+        makeFea(VlogFeatureGroup.MACHINEINFO_SYSTEM, requestContextBytes.getMachineinfo_system());
+        makeFea(VlogFeatureGroup.MACHINEINFO_WECHATVERSION, requestContextBytes.getMachineinfo_brand());
+
+        makeFea(VlogFeatureGroup.DAY, requestContextBytes.getWeek());
+        makeFea(VlogFeatureGroup.WEEK, requestContextBytes.getWeek());
+        makeFea(VlogFeatureGroup.HOUR, requestContextBytes.getHour());
+
+    }
+
+    //TODO
+    public void getUserFeatures(UserBytesFeature user) {
+        makeFea(VlogFeatureGroup.USER_CYCLE_BUCKET_7DAY, user.getUser_cycle_bucket_7days());
+        makeFea(VlogFeatureGroup.USER_SHARE_BUCKET_30DAY, user.getUser_share_bucket_30days());
+        makeFea(VlogFeatureGroup.USER_SHARE_BUCKET_30DAY, user.getUser_cycle_bucket_30days());
+
+        // 1day features
+        makeFea(VlogFeatureGroup.USER_1DAY_EXP, user.getDay1_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.USER_1DAY_CLICK, user.getDay1_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.USER_1DAY_SHARE, user.getDay1_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.USER_1DAY_RETURN, user.getDay1_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.USER_1DAY_CTR, user.getDay1_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.USER_1DAY_STR, user.getDay1_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.USER_1DAY_ROV, user.getDay1_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.USER_1DAY_ROS, user.getDay1_cnt_features().get("ros"));
+
+        // 3day features
+        makeFea(VlogFeatureGroup.USER_3DAY_EXP, user.getDay3_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.USER_3DAY_CLICK, user.getDay3_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.USER_3DAY_SHARE, user.getDay3_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.USER_3DAY_RETURN, user.getDay3_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.USER_3DAY_CTR, user.getDay3_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.USER_3DAY_STR, user.getDay3_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.USER_3DAY_ROV, user.getDay3_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.USER_3DAY_ROS, user.getDay3_cnt_features().get("ros"));
+
+        // 7day features
+        makeFea(VlogFeatureGroup.USER_7DAY_EXP, user.getDay7_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.USER_7DAY_CLICK, user.getDay7_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.USER_7DAY_SHARE, user.getDay7_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.USER_7DAY_RETURN, user.getDay7_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.USER_7DAY_CTR, user.getDay7_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.USER_7DAY_STR, user.getDay7_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.USER_7DAY_ROV, user.getDay7_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.USER_7DAY_ROS, user.getDay7_cnt_features().get("ros"));
+
+        // 3month features
+        makeFea(VlogFeatureGroup.USER_3MONTH_EXP, user.getMonth3_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_CLICK, user.getMonth3_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_SHARE, user.getMonth3_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_RETURN, user.getMonth3_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_CTR, user.getMonth3_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_STR, user.getMonth3_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_ROV, user.getMonth3_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.USER_3MONTH_ROS, user.getMonth3_cnt_features().get("ros"));
+
+    }
+
+    public void getItemFeature(VideoBytesFeature item) {
+        makeFea(VlogFeatureGroup.VIDEOID, item.getVideoId());
+        makeFea(VlogFeatureGroup.UP_ID, item.getUpId());
+        // 1day features
+        makeFea(VlogFeatureGroup.ITEM_1DAY_EXP, item.getItem_day1_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_CLICK, item.getItem_day1_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_SHARE, item.getItem_day1_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_RETURN, item.getItem_day1_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_CTR, item.getItem_day1_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_STR, item.getItem_day1_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_ROV, item.getItem_day1_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.ITEM_1DAY_ROS, item.getItem_day1_cnt_features().get("ros"));
+
+        // 3day features
+        makeFea(VlogFeatureGroup.ITEM_3DAY_EXP, item.getItem_day1_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_CLICK, item.getItem_day1_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_SHARE, item.getItem_day1_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_RETURN, item.getItem_day1_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_CTR, item.getItem_day1_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_STR, item.getItem_day1_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_ROV, item.getItem_day1_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.ITEM_3DAY_ROS, item.getItem_day1_cnt_features().get("ros"));
+
+        // 7day features
+        makeFea(VlogFeatureGroup.ITEM_7DAY_EXP, item.getItem_day7_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_CLICK, item.getItem_day7_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_SHARE, item.getItem_day7_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_RETURN, item.getItem_day7_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_CTR, item.getItem_day7_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_STR, item.getItem_day7_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_ROV, item.getItem_day7_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.ITEM_7DAY_ROS, item.getItem_day7_cnt_features().get("ros"));
+
+        // 3month features
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_EXP, item.getItem_month3_cnt_features().get("exp"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_CLICK, item.getItem_month3_cnt_features().get("click"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_SHARE, item.getItem_month3_cnt_features().get("share"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_RETURN, item.getItem_month3_cnt_features().get("return"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_CTR, item.getItem_month3_cnt_features().get("ctr"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_STR, item.getItem_month3_cnt_features().get("str"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_ROV, item.getItem_month3_cnt_features().get("rov"));
+        makeFea(VlogFeatureGroup.ITEM_3MONTH_ROS, item.getItem_month3_cnt_features().get("ros"));
+
+    }
+
+    @Override
+    public synchronized LRSamples single(UserBytesFeature userBytesFeature,
+                                         VideoBytesFeature videoBytesFeature,
+                                         RequestContextBytesFeature requestContextBytesFeature) {
+        features.clear();
+        // extract features
+        getUserFeatures(userBytesFeature);
+        getContextFeatures(requestContextBytesFeature);
+        getItemFeature(videoBytesFeature);
+
+        LRSamples.Builder lr = com.tzld.piaoquan.recommend.server.gen.recommend.LRSamples.newBuilder();
+        lr.setGroupNum(groupCount);
+        List<FeatureGroup> keys = new ArrayList<>(features.keySet());
+        int count = 0;
+        for(FeatureGroup group : keys) {
+            List<BaseFeature> fea = features.get(group);
+            GroupedFeature.Builder gf = GroupedFeature.newBuilder();
+            gf.setGroup(group);
+            gf.setCount(fea.size());
+            gf.addAllFeatures(fea);
+            count += fea.size();
+            lr.addFeatures(gf);
+        }
+        lr.setCount(count);
+        return lr.build();
+    }
+
+
+}

+ 26 - 0
src/main/python/spark_oss.py

@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+import sys
+from pyspark.sql import SparkSession
+
+try:
+    # for python 2
+    reload(sys)
+    sys.setdefaultencoding('utf8')
+except:
+    # python 3 not needed
+    pass
+
+if __name__ == '__main__':
+    spark = SparkSession.builder\
+        .appName("spark write df to oss")\
+        .getOrCreate()
+
+    data = [i for i in range(0, 100)]
+
+    df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s)).toDF("name: string, num: int")
+
+    df.show(n=10)
+
+    # write to oss
+    pathout = 'oss://yeshan01/test.csv'
+    df.write.csv(pathout)

+ 55 - 0
src/main/python/spark_sql.py

@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+import sys
+from pyspark.sql import SparkSession
+
+try:
+    # for python 2
+    reload(sys)
+    sys.setdefaultencoding('utf8')
+except:
+    # python 3 not needed
+    pass
+
+if __name__ == '__main__':
+    spark = SparkSession.builder\
+        .appName("spark sql")\
+        .config("spark.sql.broadcastTimeout", 20 * 60)\
+        .config("spark.sql.crossJoin.enabled", True)\
+        .getOrCreate()
+
+    tableName = "mc_test_table"
+    ptTableName = "mc_test_pt_table"
+    data = [i for i in range(0, 100)]
+
+    # Drop Create
+    spark.sql("DROP TABLE IF EXISTS %s" % tableName)
+    spark.sql("DROP TABLE IF EXISTS %s" % ptTableName)
+
+    spark.sql("CREATE TABLE %s (name STRING, num BIGINT)" % tableName)
+    spark.sql("CREATE TABLE %s (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)" % ptTableName)
+
+    df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s)).toDF("name: string, num: int")
+    pt_df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s, "2018", "0601")).toDF("name: string, num: int, pt1: string, pt2: string")
+
+    # 写 普通表
+    df.write.insertInto(tableName) # insertInto语义
+    df.writeTo(tableName).overwritePartitions() # insertOverwrite use datasourcev2
+
+    # 写 分区表
+    # DataFrameWriter 无法指定分区写入 需要通过临时表再用SQL写入特定分区
+    df.createOrReplaceTempView("%s_tmp_view" % ptTableName)
+    spark.sql("insert into table %s partition (pt1='2018', pt2='0601') select * from %s_tmp_view" % (ptTableName, ptTableName))
+    spark.sql("insert overwrite table %s partition (pt1='2018', pt2='0601') select * from %s_tmp_view" % (ptTableName, ptTableName))
+
+    pt_df.write.insertInto(ptTableName) # 动态分区 insertInto语义
+    pt_df.write.insertInto(ptTableName, True) # 动态分区 insertOverwrite语义
+
+    # 读 普通表
+    rdf = spark.sql("select name, num from %s" % tableName)
+    print("rdf count, %s\n" % rdf.count())
+    rdf.printSchema()
+
+    # 读 分区表
+    rptdf = spark.sql("select name, num, pt1, pt2 from %s where pt1 = '2018' and pt2 = '0601'" % ptTableName)
+    print("rptdf count, %s" % (rptdf.count()))
+    rptdf.printSchema()

+ 46 - 0
src/main/scala/com/aliyun/odps/spark/examples/SparkPi.scala

@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples
+
+import org.apache.spark.sql.SparkSession
+
+import scala.math.random
+
+object SparkPi {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("SparkPi")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    try {
+      val slices = if (args.length > 0) args(0).toInt else 2
+      val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
+      val count = sc.parallelize(1 until n, slices).map { i =>
+        val x = random * 2 - 1
+        val y = random * 2 - 1
+        if (x * x + y * y < 1) 1 else 0
+      }.reduce(_ + _)
+      println("Pi is roughly " + 4.0 * count / n)
+    } finally {
+      sc.stop()
+    }
+  }
+}

+ 37 - 0
src/main/scala/com/aliyun/odps/spark/examples/WordCount.scala

@@ -0,0 +1,37 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples
+
+import org.apache.spark.sql.SparkSession
+
+object WordCount {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("WordCount")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    try {
+      sc.parallelize(1 to 100, 10).map(word => (word, 1)).reduceByKey(_ + _, 10).take(100).foreach(println)
+    } finally {
+      sc.stop()
+    }
+  }
+}

+ 73 - 0
src/main/scala/com/aliyun/odps/spark/examples/graphx/PageRank.scala

@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples.graphx
+
+import org.apache.spark.graphx.{Edge, Graph, VertexId}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+object PageRank {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName("PageRank")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // build vertices
+    val users: RDD[(VertexId, Array[String])] = sc.parallelize(List(
+      "1,BarackObama,Barack Obama",
+      "2,ladygaga,Goddess of Love",
+      "3,jeresig,John Resig",
+      "4,justinbieber,Justin Bieber",
+      "6,matei_zaharia,Matei Zaharia",
+      "7,odersky,Martin Odersky",
+      "8,anonsys"
+    ).map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail)))
+
+    // build edges
+    val followers: RDD[Edge[Double]] = sc.parallelize(Array(
+      Edge(2L, 1L, 1.0),
+      Edge(4L, 1L, 1.0),
+      Edge(1L, 2L, 1.0),
+      Edge(6L, 3L, 1.0),
+      Edge(7L, 3L, 1.0),
+      Edge(7L, 6L, 1.0),
+      Edge(6L, 7L, 1.0),
+      Edge(3L, 7L, 1.0)
+    ))
+
+    // build graph
+    val followerGraph: Graph[Array[String], Double] = Graph(users, followers)
+
+    // restrict the graph to users with usernames and names
+    val subgraph = followerGraph.subgraph(vpred = (vid, attr) => attr.size == 2)
+
+    // compute PageRank
+    val pageRankGraph = subgraph.pageRank(0.001)
+
+    // get attributes of the top pagerank users
+    val userInfoWithPageRank = subgraph.outerJoinVertices(pageRankGraph.vertices) {
+      case (uid, attrList, Some(pr)) => (pr, attrList.toList)
+      case (uid, attrList, None) => (0.0, attrList.toList)
+    }
+
+    println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
+  }
+}

+ 62 - 0
src/main/scala/com/aliyun/odps/spark/examples/mllib/KmeansModelSaveToOss.scala

@@ -0,0 +1,62 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples.mllib
+
+import org.apache.spark.mllib.clustering.KMeans._
+import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.sql.SparkSession
+
+object KmeansModelSaveToOss {
+  val modelOssDir = "oss://bucket/kmeans-model"
+
+  def main(args: Array[String]) {
+
+    //1. train and save the model
+    val spark = SparkSession
+      .builder()
+      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
+      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
+      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
+      .appName("KmeansModelSaveToOss")
+      .getOrCreate()
+
+    val sc = spark.sparkContext
+    val points = Seq(
+      Vectors.dense(0.0, 0.0),
+      Vectors.dense(0.0, 0.1),
+      Vectors.dense(0.1, 0.0),
+      Vectors.dense(9.0, 0.0),
+      Vectors.dense(9.0, 0.2),
+      Vectors.dense(9.2, 0.0)
+    )
+    val rdd = sc.parallelize(points, 3)
+    val initMode = K_MEANS_PARALLEL
+    val model = KMeans.train(rdd, k = 2, maxIterations = 2, initMode)
+    val predictResult1 = rdd.map(feature => "cluster id: " + model.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect
+    println("modelOssDir=" + modelOssDir)
+    model.save(sc, modelOssDir)
+
+    //2. predict from the oss model
+    val modelLoadOss = KMeansModel.load(sc, modelOssDir)
+    val predictResult2 = rdd.map(feature => "cluster id: " + modelLoadOss.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect
+    assert(predictResult1.size == predictResult2.size)
+    predictResult2.foreach(result2 => assert(predictResult1.contains(result2)))
+  }
+}

+ 69 - 0
src/main/scala/com/aliyun/odps/spark/examples/oss/JindoFsDemo.scala

@@ -0,0 +1,69 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples.oss
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+object JindoFsDemo {
+  def main(args: Array[String]): Unit = {
+    val aliyunUid : String = args(0)
+    val role : String = args(1)
+    val bucket : String = args(2)
+    val ossPath : String = args(3)
+
+    // using ram-role assume
+    val conf = new SparkConf()
+      .setAppName("jindo-fs-demo")
+      .set("spark.hadoop.fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS")
+      .set("spark.hadoop.fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem")
+      .set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
+      .set("spark.hadoop.fs.jfs.cache.oss.credentials.provider", "com.aliyun.emr.fs.auth.CustomCredentialsProvider")
+      .set("spark.hadoop.aliyun.oss.provider.url", s"http://localhost:10011/sts-token-info?user_id=${aliyunUid}&role=${role}")
+
+
+    //using access-key-id/access-key-secret
+//    val conf = new SparkConf()
+//      .setAppName("jindo-fs-demo")
+//      .set("spark.hadoop.fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS")
+//      .set("spark.hadoop.fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem")
+//      .set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
+//      .set("spark.hadoop.fs.oss.accessKeyId", "xxx")
+//      .set("spark.hadoop.fs.oss.accessKeySecret", "xxx")
+
+    val sc = new SparkContext(conf)
+
+    try {
+      read_oss_dir(sc, "demo", s"oss://${bucket}/${ossPath}")
+    } finally {
+      sc.stop()
+    }
+  }
+
+  /**
+    * compute cost time using jindo sdk
+    */
+  def read_oss_dir(sc: SparkContext, job_des:String, ossPath: String): Unit = {
+    val startTime: Long = System.currentTimeMillis()
+    val inputData = sc.textFile(ossPath, 20)
+    val cnt = inputData.count
+    val endTime:Long = System.currentTimeMillis()
+    val cost:Long = endTime - startTime
+    println(s"job:$job_des, count:$cnt, consume:$cost")
+  }
+}

+ 43 - 0
src/main/scala/com/aliyun/odps/spark/examples/oss/SparkUnstructuredDataCompute.scala

@@ -0,0 +1,43 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples.oss
+
+import org.apache.spark.sql.SparkSession
+
+object SparkUnstructuredDataCompute {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
+      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
+      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
+      .appName("SparkUnstructuredDataCompute")
+      .getOrCreate()
+
+    val sc = spark.sparkContext
+    try {
+      val pathIn = "oss://bucket/inputdata/"
+      val inputData = sc.textFile(pathIn, 5)
+      val cnt = inputData.count
+      println(s"count: $cnt")
+    } finally {
+      sc.stop()
+    }
+  }
+}

+ 82 - 0
src/main/scala/com/aliyun/odps/spark/examples/sparksql/SparkSQL.scala

@@ -0,0 +1,82 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package com.aliyun.odps.spark.examples.sparksql
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+
+object SparkSQL {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("SparkSQL-on-MaxCompute")
+      .config("spark.sql.broadcastTimeout", 20 * 60)
+      .config("spark.sql.crossJoin.enabled", true)
+      .config("spark.sql.defaultCatalog","odps")
+      .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
+      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
+      .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
+      .config("spark.sql.catalogImplementation","hive")
+      .getOrCreate()
+
+    import spark._
+    import sqlContext.implicits._
+    val tableName = "mc_test_table"
+    val ptTableName = "mc_test_pt_table"
+    // Drop Create
+    sql(s"DROP TABLE IF EXISTS ${tableName}")
+    sql(s"DROP TABLE IF EXISTS ${ptTableName}")
+
+    sql(s"CREATE TABLE ${tableName} (name STRING, num BIGINT)")
+    sql(s"CREATE TABLE ${ptTableName} (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)")
+
+    val df = spark.sparkContext.parallelize(0 to 99, 2).map(f => {
+      (s"name-$f", f)
+    }).toDF("name", "num")
+
+    val ptDf = spark.sparkContext.parallelize(0 to 99, 2).map(f => {
+      (s"name-$f", f, "2018", "0601")
+    }).toDF("name", "num", "pt1", "pt2")
+
+    // 写 普通表
+    df.write.insertInto(tableName) // insertInto语义
+    df.writeTo(tableName).overwritePartitions() // insertOverwrite use datasourceV2
+
+    // 写 分区表
+    // DataFrameWriter 无法指定分区写入 需要通过临时表再用SQL写入特定分区
+    df.createOrReplaceTempView(s"${ptTableName}_tmp_view")
+    sql(s"insert into table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view")
+    sql(s"insert overwrite table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view")
+
+    ptDf.write.insertInto(ptTableName) // 动态分区 insertInto语义
+    ptDf.write.mode("overwrite").insertInto(ptTableName) // 动态分区 insertOverwrite语义
+
+    // 读 普通表
+    val rdf = sql(s"select name, num from $tableName")
+    println(s"rdf show, ${rdf.count()}")
+    rdf.show()
+    rdf.printSchema()
+
+    // 读 分区表
+    val rptdf = sql(s"select name, num, pt1, pt2 from $ptTableName where pt1 = '2018' and pt2 = '0601'")
+    println(s"rptdf show, ${rptdf.count()}")
+    rptdf.show()
+    rptdf.printSchema()
+  }
+}
+