فهرست منبع

feature produce

丁云鹏 11 ماه پیش
والد
کامیت
05bbea7f22

+ 10 - 0
recommend-feature-produce/pom.xml

@@ -50,6 +50,16 @@
             <artifactId>lombok</artifactId>
             <version>1.18.24</version>
         </dependency>
+        <dependency>
+            <groupId>com.ctrip.framework.apollo</groupId>
+            <artifactId>apollo-client</artifactId>
+            <version>1.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.8.0</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 0 - 8
recommend-feature-produce/src/main/java/feature/ODPSToRedis.java

@@ -1,8 +0,0 @@
-package feature;
-
-/**
- * @author dyp
- */
-public class ODPSToRedis {
-
-}

+ 3 - 3
recommend-feature-produce/src/main/java/feature/Demo.java → recommend-feature-produce/src/main/java/feature/produce/Demo.java

@@ -1,4 +1,4 @@
-package feature;
+package feature.produce;
 
 import com.aliyun.odps.Instance;
 import com.aliyun.odps.Odps;
@@ -7,8 +7,8 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
-import feature.util.CommonCollectionUtils;
-import feature.util.JSONUtils;
+import feature.produce.util.CommonCollectionUtils;
+import feature.produce.util.JSONUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;

+ 64 - 0
recommend-feature-produce/src/main/java/feature/produce/ODPSToRedis.java

@@ -0,0 +1,64 @@
+package feature.produce;
+
+import feature.produce.model.DTSConfig;
+import feature.produce.service.CMDService;
+import feature.produce.service.DTSConfigService;
+import feature.produce.service.ODPSService;
+import feature.produce.service.RedisService;
+import feature.produce.util.JSONUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class ODPSToRedis {
+    private static ODPSService odpsService = new ODPSService();
+    private static RedisService redisService = new RedisService();
+    private static CMDService cmdService = new CMDService();
+    private static DTSConfigService dtsConfigService = new DTSConfigService();
+
+    public static void main(String[] args) {
+
+        Map<String, String> argMap = cmdService.parse(args);
+        if (MapUtils.isEmpty(argMap)) {
+            log.error("args is empty");
+            return;
+        }
+        // ODPS
+        DTSConfig config = dtsConfigService.getDTSConfig(argMap);
+        if (!config.selfCheck()) {
+            log.error("dts config error");
+            return;
+        }
+
+
+        List<Map<String, String>> fieldValues = odpsService.read(config, argMap);
+        if (CollectionUtils.isEmpty(fieldValues)) {
+            return;
+        }
+
+        // RDD
+        SparkConf sparkConf = new SparkConf().setAppName("odps sync to redis");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        JavaRDD<Map<String, String>> readData = jsc.parallelize(fieldValues);
+        JavaRDD<Pair<String, String>> json = readData.map(
+                f -> Pair.of(redisService.redisKey(f, config), JSONUtils.toJson(f))
+        );
+
+        int partitionNum = fieldValues.size() / 1000 + 1;
+        json.repartition(partitionNum).foreachPartition(iterator -> {
+            redisService.mSet(iterator, config);
+        });
+    }
+
+}

+ 42 - 0
recommend-feature-produce/src/main/java/feature/produce/model/DTSConfig.java

@@ -0,0 +1,42 @@
+package feature.produce.model;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+/**
+ * @author dyp
+ */
+@Data
+@Slf4j
+public class DTSConfig {
+    private ODPS odps;
+    private Redis redis;
+
+    @Data
+    public static class ODPS {
+        private String table;
+        private List<String> cols;
+        private List<String> partition;
+    }
+
+    @Data
+    public static class Redis {
+        private String prefix;
+        private List<String> key;
+        private long expire;
+    }
+
+    public boolean selfCheck() {
+        if (odps == null) {
+            log.error("odps not config");
+            return false;
+        }
+        if (redis == null) {
+            log.error("redis not config");
+            return false;
+        }
+        return true;
+    }
+}

+ 20 - 0
recommend-feature-produce/src/main/java/feature/produce/service/CMDService.java

@@ -0,0 +1,20 @@
+package feature.produce.service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
+ *
+ * @author dyp
+ */
+public class CMDService {
+
+    public Map<String, String> parse(String[] args) {
+        Map<String, String> map = new HashMap<>();
+        for (int i = 0; i < args.length - 1; i++) {
+            map.put(args[i].substring(1), args[i + 1]);
+        }
+        return map;
+    }
+}

+ 43 - 0
recommend-feature-produce/src/main/java/feature/produce/service/DTSConfigService.java

@@ -0,0 +1,43 @@
+package feature.produce.service;
+
+import com.aliyun.core.utils.StringUtils;
+import com.ctrip.framework.apollo.ConfigService;
+import com.google.common.reflect.TypeToken;
+import feature.produce.model.DTSConfig;
+import feature.produce.util.JSONUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
+ *
+ * @author dyp
+ */
+@Slf4j
+public class DTSConfigService {
+
+    public DTSConfig getDTSConfig(Map<String, String> argMap) {
+        List<DTSConfig> dtsConfigs = JSONUtils.fromJson(
+                ConfigService.getAppConfig().getProperty("dts.config.v2", ""),
+                new TypeToken<List<DTSConfig>>() {
+                },
+                Collections.emptyList());
+        if (CollectionUtils.isEmpty(dtsConfigs)) {
+            log.error("DTSConfig not config");
+            return null;
+        }
+        Optional<DTSConfig> optional = dtsConfigs.stream()
+                .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), argMap.get("table")))
+                .findFirst();
+        if (optional.isPresent()) {
+            log.error("table {} not config", argMap.get("table"));
+            return null;
+        }
+        return optional.get();
+    }
+}

+ 40 - 10
recommend-feature-produce/src/main/java/feature/service/ODPSService.java → recommend-feature-produce/src/main/java/feature/produce/service/ODPSService.java

@@ -1,4 +1,4 @@
-package feature.service;
+package feature.produce.service;
 
 import com.aliyun.odps.Instance;
 import com.aliyun.odps.Odps;
@@ -7,8 +7,11 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+import com.aliyun.odps.utils.StringUtils;
 import com.google.common.base.Joiner;
-import feature.util.CommonCollectionUtils;
+import feature.produce.model.DTSConfig;
+import feature.produce.util.CommonCollectionUtils;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -20,18 +23,44 @@ import java.util.Map;
  *
  * @author dyp
  */
+@Slf4j
 public class ODPSService {
     private final String accessId = "LTAIWYUujJAm7CbH";
     private final String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
     private final String odpsUrl = "http://service.odps.aliyun.com/api";
-    private final String sqlFormat = "select %s from %s where %s ;";
+    private final String sqlFormat = "select %s from %s where 1=1 %s ;";
 
 
-    public List<Map<String, String>> read(String project,
-                                          String table,
-                                          List<String> colNames,
-                                          String condition) {
-        // ODPS
+    /**
+     * @return k: 列名 v:值
+     */
+    public List<Map<String, String>> read(DTSConfig config, Map<String, String> argMap) {
+
+        String project = argMap.get("project");
+        if (StringUtils.isBlank(project)) {
+            return Collections.emptyList();
+        }
+
+        String table = argMap.get("table");
+        if (StringUtils.isBlank(table)) {
+            return Collections.emptyList();
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for (String partition : config.getOdps().getPartition()) {
+            sb.append(" and ");
+            sb.append(partition);
+            sb.append(" = ");
+            sb.append(argMap.get(partition));
+        }
+
+        return read(project, table, config.getOdps().getCols(), sb.toString());
+    }
+
+    private List<Map<String, String>> read(String project,
+                                           String table,
+                                           List<String> colNames,
+                                           String condition) {
         Account account = new AliyunAccount(accessId, accessKey);
         Odps odps = new Odps(account);
         odps.setEndpoint(odpsUrl);
@@ -39,13 +68,14 @@ public class ODPSService {
 
         String sql = String.format(sqlFormat, Joiner.on(",").join(colNames), table, condition);
 
-        List<Record> records = Collections.emptyList();
+        List<Record> records;
         try {
             Instance i = SQLTask.run(odps, sql);
             i.waitForSuccess();
             records = SQLTask.getResult(i);
         } catch (OdpsException e) {
-            e.printStackTrace();
+            log.error("request odps error", e);
+            return Collections.emptyList();
         }
 
         List<Map<String, String>> fieldValues = CommonCollectionUtils.toList(records, r -> {

+ 58 - 0
recommend-feature-produce/src/main/java/feature/produce/service/RedisService.java

@@ -0,0 +1,58 @@
+package feature.produce.service;
+
+import feature.produce.model.DTSConfig;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
+ *
+ * @author dyp
+ */
+public class RedisService {
+    private int port = 6379;
+    private String password = "Wqsd@2019";
+    private String hostName = "r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com";
+
+    private void mSetEx(Iterator<Pair<String, String>> keyValues, long expire, TimeUnit timeUnit) {
+        Jedis jedis = new Jedis(hostName, port);
+        jedis.auth(password);
+        Pipeline pipeline = jedis.pipelined();
+        while (keyValues.hasNext()) {
+            Pair<String, String> p = keyValues.next();
+            pipeline.setex(p.getLeft(), timeUnit.toSeconds(expire), p.getRight());
+        }
+        pipeline.sync();
+        jedis.close();
+    }
+
+    public void mSet(Iterator<Pair<String, String>> keyValues, DTSConfig config) {
+        long expire = config.getRedis().getExpire();
+        mSetEx(keyValues, expire, TimeUnit.SECONDS);
+    }
+
+    public String redisKey(Map<String, String> fieldValueMap, DTSConfig config) {
+        // Note:写入和读取的key生成规则应保持一致
+        List<String> fields = config.getRedis().getKey();
+        if (CollectionUtils.isEmpty(fields)) {
+            return config.getRedis().getPrefix();
+        }
+        StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(config.getRedis().getPrefix())) {
+            sb.append(config.getRedis().getPrefix());
+        }
+        for (String field : fields) {
+            sb.append(":");
+            sb.append(fieldValueMap.get(field));
+        }
+        return sb.toString();
+    }
+}

+ 9 - 1
recommend-feature-produce/src/main/java/feature/util/CommonCollectionUtils.java → recommend-feature-produce/src/main/java/feature/produce/util/CommonCollectionUtils.java

@@ -1,4 +1,4 @@
-package feature.util;
+package feature.produce.util;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -25,6 +25,14 @@ public class CommonCollectionUtils {
         return list.stream().collect(Collectors.toMap(keyFunc::apply, valueFunc::apply));
     }
 
+    public static <T, K, V> Map<K, V> toMap(T[] list, Function<T, K> keyFunc, Function<T, V> valueFunc) {
+
+        if (list == null || list.length == 0) {
+            return Collections.emptyMap();
+        }
+        return Arrays.stream(list).collect(Collectors.toMap(keyFunc::apply, valueFunc::apply));
+    }
+
     public static <K, V> Map<K, V> merge(Map<K, V> map1, Map<K, V> map2) {
         if (MapUtils.isEmpty(map1)) {
             return map2;

+ 1 - 1
recommend-feature-produce/src/main/java/feature/util/DateUtils.java → recommend-feature-produce/src/main/java/feature/produce/util/DateUtils.java

@@ -1,4 +1,4 @@
-package feature.util;
+package feature.produce.util;
 
 import java.text.SimpleDateFormat;
 import java.util.Calendar;

+ 37 - 0
recommend-feature-produce/src/main/java/feature/produce/util/JSONUtils.java

@@ -0,0 +1,37 @@
+package feature.produce.util;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+@Slf4j
+public class JSONUtils {
+
+
+    public static String toJson(Object obj) {
+        if (obj == null) {
+            return "";
+        }
+        try {
+            return new Gson().toJson(obj);
+        } catch (Exception e) {
+            log.error("toJson exception", e);
+            return "";
+        }
+    }
+
+    public static <T> T fromJson(String value, TypeToken<T> typeToken, T defaultValue) {
+
+        if (StringUtils.isBlank(value)) {
+            return defaultValue;
+        }
+        try {
+            return new Gson().fromJson(value, typeToken.getType());
+        } catch (Exception e) {
+            log.error("fromJson error! value=[{}]", value, e);
+        }
+        return defaultValue;
+    }
+
+}

+ 0 - 29
recommend-feature-produce/src/main/java/feature/service/RedisService.java

@@ -1,29 +0,0 @@
-package feature.service;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.Pipeline;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
- *
- * @author dyp
- */
-public class RedisService {
-    private int port = 6379;
-    private String password = "Wqsd@2019";
-    private String hostName = "r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com";
-
-    public void mSetEx(Map<String, String> keyValues, long expire, TimeUnit timeUnit) {
-        Jedis jedis = new Jedis(hostName, port);
-        jedis.auth(password);
-        Pipeline pipeline = jedis.pipelined();
-        keyValues.entrySet().forEach(e -> {
-            pipeline.setex(e.getKey(), timeUnit.toSeconds(expire), e.getValue());
-        });
-        pipeline.sync();
-        jedis.close();
-    }
-}

+ 0 - 22
recommend-feature-produce/src/main/java/feature/util/JSONUtils.java

@@ -1,22 +0,0 @@
-package feature.util;
-
-import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class JSONUtils {
-
-
-    public static String toJson(Object obj) {
-        if (obj == null) {
-            return "";
-        }
-        try {
-            return new Gson().toJson(obj);
-        } catch (Exception e) {
-            log.error("toJson exception", e);
-            return "";
-        }
-    }
-
-}