丁云鹏 11 kuukautta sitten
vanhempi
commit
d98ce20c69

+ 1 - 0
pom.xml

@@ -18,6 +18,7 @@
     <modules>
         <module>recommend-feature-service</module>
         <module>recommend-feature-client</module>
+        <module>recommend-feature-produce</module>
     </modules>
 
     <dependencies>

+ 55 - 0
recommend-feature-produce/pom.xml

@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>recommend-feature</artifactId>
+        <groupId>com.tzld.piaoquan</groupId>
+        <version>1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>recommend-feature-produce</artifactId>
+
+    <properties>
+        <spark.version>3.5.1</spark.version>
+        <odps.version>0.48.3-public</odps.version>
+
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>5.1.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.redislabs</groupId>
+            <artifactId>spark-redis_2.12</artifactId>
+            <version>3.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>${odps.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-commons</artifactId>
+            <version>${odps.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+        </dependency>
+    </dependencies>
+
+</project>

+ 110 - 0
recommend-feature-produce/src/main/java/feature/Demo.java

@@ -0,0 +1,110 @@
+package feature;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import feature.util.CommonCollectionUtils;
+import feature.util.JSONUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+public class Demo {
+    public static void main(String[] args) {
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+//        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+//        String table = "alg_vid_feature_share2return";
+
+        // ODPS
+        Account account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(odpsUrl);
+        odps.setDefaultProject(project);
+        System.out.println("Read odps table...");
+        Instance i;
+        String sql = "select vid,feature from alg_vid_feature_share2return where dt = '20240522' and hh = '15' limit " +
+                "10000;";
+        List<Record> records = Collections.emptyList();
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            records = SQLTask.getResult(i);
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        }
+
+        List<Pair<String, String>> features = CommonCollectionUtils.toList(records, r -> Pair.of(r.getString(0),
+                r.getString(1)));
+
+        // RDD
+        SparkConf sparkConf = new SparkConf()
+                .setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)")
+                .setMaster("local[4]");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        JavaRDD<Pair<String, String>> readData = jsc.parallelize(features);
+        JavaRDD<Pair<String, String>> json = readData.map(f -> {
+            Map<String, String> map = new HashMap<>();
+            // r.getColumns(); 如果列名对齐,才设置列值
+            map.put("vid", f.getLeft());
+            map.put("feature", f.getRight());
+
+            return Pair.of("alg_vid_feature_share2return_test:" + f.getLeft(), JSONUtils.toJson(map));
+
+        });
+
+        // Redis
+        int port = 6379;
+        String password = "Wqsd@2019";
+        String hostName = "r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com";
+
+
+        int partitionNum = records.size() / 1000 + 1;
+        System.out.println(records.size());
+        json.repartition(partitionNum).foreachPartition(iterator -> {
+
+            if (!iterator.hasNext()) {
+                return;
+            }
+            Jedis jedis = new Jedis(hostName, port);
+            jedis.auth(password);
+            Pipeline pipeline = jedis.pipelined();
+            int j = 0;
+            while (iterator.hasNext()) {
+                j++;
+                Pair<String, String> pair = iterator.next();
+                pipeline.setex(pair.getLeft(), 7200, pair.getRight());
+            }
+            System.out.println(j);
+            pipeline.sync();
+            jedis.close();
+        });
+    }
+
+
+//        System.out.println("counts: ");
+//        System.out.println(readData.count());
+
+    private String redisKey(Record r) {
+        // TODO 根据配置生成Key
+        return "";
+    }
+
+}

+ 8 - 0
recommend-feature-produce/src/main/java/feature/ODPSToRedis.java

@@ -0,0 +1,8 @@
+package feature;
+
+/**
+ * @author dyp
+ */
+public class ODPSToRedis {
+
+}

+ 61 - 0
recommend-feature-produce/src/main/java/feature/service/ODPSService.java

@@ -0,0 +1,61 @@
+package feature.service;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import com.google.common.base.Joiner;
+import feature.util.CommonCollectionUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
+ *
+ * @author dyp
+ */
+public class ODPSService {
+    private final String accessId = "LTAIWYUujJAm7CbH";
+    private final String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+    private final String odpsUrl = "http://service.odps.aliyun.com/api";
+    private final String sqlFormat = "select %s from %s where %s ;";
+
+
+    public List<Map<String, String>> read(String project,
+                                          String table,
+                                          List<String> colNames,
+                                          String condition) {
+        // ODPS
+        Account account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(odpsUrl);
+        odps.setDefaultProject(project);
+
+        String sql = String.format(sqlFormat, Joiner.on(",").join(colNames), table, condition);
+
+        List<Record> records = Collections.emptyList();
+        try {
+            Instance i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            records = SQLTask.getResult(i);
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        }
+
+        List<Map<String, String>> fieldValues = CommonCollectionUtils.toList(records, r -> {
+            Map<String, String> map = new HashMap<>();
+            for (int i = 0; i < r.getColumnCount(); i++) {
+                map.put(r.getColumns()[i].getName(), r.getString(i));
+            }
+            return map;
+        });
+
+        return fieldValues;
+    }
+}

+ 29 - 0
recommend-feature-produce/src/main/java/feature/service/RedisService.java

@@ -0,0 +1,29 @@
+package feature.service;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
+ *
+ * @author dyp
+ */
+public class RedisService {
+    private int port = 6379;
+    private String password = "Wqsd@2019";
+    private String hostName = "r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com";
+
+    public void mSetEx(Map<String, String> keyValues, long expire, TimeUnit timeUnit) {
+        Jedis jedis = new Jedis(hostName, port);
+        jedis.auth(password);
+        Pipeline pipeline = jedis.pipelined();
+        keyValues.entrySet().forEach(e -> {
+            pipeline.setex(e.getKey(), timeUnit.toSeconds(expire), e.getValue());
+        });
+        pipeline.sync();
+        jedis.close();
+    }
+}

+ 41 - 0
recommend-feature-produce/src/main/java/feature/util/CommonCollectionUtils.java

@@ -0,0 +1,41 @@
+package feature.util;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * @author dyp
+ */
+public class CommonCollectionUtils {
+    public static <T, R> List<R> toList(Collection<T> list, Function<T, R> map) {
+        if (CollectionUtils.isEmpty(list)) {
+            return Collections.emptyList();
+        }
+        return list.stream().map(map).collect(Collectors.toList());
+    }
+
+    public static <T, K, V> Map<K, V> toMap(List<T> list, Function<T, K> keyFunc, Function<T, V> valueFunc) {
+        if (CollectionUtils.isEmpty(list)) {
+            return Collections.emptyMap();
+        }
+        return list.stream().collect(Collectors.toMap(keyFunc::apply, valueFunc::apply));
+    }
+
+    public static <K, V> Map<K, V> merge(Map<K, V> map1, Map<K, V> map2) {
+        if (MapUtils.isEmpty(map1)) {
+            return map2;
+        }
+        if (MapUtils.isEmpty(map2)) {
+            return map1;
+        }
+
+        Map<K, V> map = new HashMap<>();
+        map.putAll(map1);
+        map.putAll(map2);
+        return map;
+    }
+}

+ 30 - 0
recommend-feature-produce/src/main/java/feature/util/DateUtils.java

@@ -0,0 +1,30 @@
+package feature.util;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * @author dyp
+ */
+public final class DateUtils {
+    public static String getCurrentDateStr(String format) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        Date currentDate = new Date();
+        return dateFormat.format(currentDate);
+    }
+
+    public static int getCurrentHour() {
+        Calendar calendar = Calendar.getInstance();
+        return calendar.get(Calendar.HOUR_OF_DAY);
+    }
+
+    public static String getBeforeDaysDateStr(String format, int d) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date());
+        calendar.add(Calendar.DAY_OF_MONTH, -d);
+        Date previousDate = calendar.getTime();
+        return dateFormat.format(previousDate);
+    }
+}

+ 22 - 0
recommend-feature-produce/src/main/java/feature/util/JSONUtils.java

@@ -0,0 +1,22 @@
+package feature.util;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class JSONUtils {
+
+
+    public static String toJson(Object obj) {
+        if (obj == null) {
+            return "";
+        }
+        try {
+            return new Gson().toJson(obj);
+        } catch (Exception e) {
+            log.error("toJson exception", e);
+            return "";
+        }
+    }
+
+}

+ 1 - 1
recommend-feature-service/src/main/resources/application-dev.yml

@@ -29,7 +29,7 @@ spring:
         max-idle: 8
         min-idle: 0
   tair:
-    hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+    hostName: r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
     timeout: 1000

+ 1 - 1
recommend-feature-service/src/main/resources/application-test.yml

@@ -25,7 +25,7 @@ spring:
         max-idle: 8
         min-idle: 0
   tair:
-    hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+    hostName: r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
     timeout: 1000