Browse Source

特征生产

丁云鹏 11 months ago
parent
commit
66f7ce219d

+ 12 - 99
recommend-feature-produce/src/main/java/feature/produce/Demo.java

@@ -1,110 +1,23 @@
 package feature.produce;
 
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.OdpsException;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.task.SQLTask;
-import feature.produce.util.CommonCollectionUtils;
-import feature.produce.util.JSONUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.Pipeline;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author dyp
  */
 public class Demo {
     public static void main(String[] args) {
-        String accessId = "LTAIWYUujJAm7CbH";
-        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
-        String odpsUrl = "http://service.odps.aliyun.com/api";
-//        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
-        String project = "loghubods";
-//        String table = "alg_vid_feature_share2return";
-
-        // ODPS
-        Account account = new AliyunAccount(accessId, accessKey);
-        Odps odps = new Odps(account);
-        odps.setEndpoint(odpsUrl);
-        odps.setDefaultProject(project);
-        System.out.println("Read odps table...");
-        Instance i;
-        String sql = "select vid,feature from alg_vid_feature_share2return where dt = '20240522' and hh = '15' limit " +
-                "10000;";
-        List<Record> records = Collections.emptyList();
-        try {
-            i = SQLTask.run(odps, sql);
-            i.waitForSuccess();
-            records = SQLTask.getResult(i);
-        } catch (OdpsException e) {
-            e.printStackTrace();
-        }
-
-        List<Pair<String, String>> features = CommonCollectionUtils.toList(records, r -> Pair.of(r.getString(0),
-                r.getString(1)));
-
-        // RDD
-        SparkConf sparkConf = new SparkConf()
-                .setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)")
-                .setMaster("local[4]");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        JavaRDD<Pair<String, String>> readData = jsc.parallelize(features);
-        JavaRDD<Pair<String, String>> json = readData.map(f -> {
-            Map<String, String> map = new HashMap<>();
-            // r.getColumns(); 如果列名对齐,才设置列值
-            map.put("vid", f.getLeft());
-            map.put("feature", f.getRight());
-
-            return Pair.of("alg_vid_feature_share2return_test:" + f.getLeft(), JSONUtils.toJson(map));
-
-        });
-
-        // Redis
-        int port = 6379;
-        String password = "Wqsd@2019";
-        String hostName = "r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com";
-
-
-        int partitionNum = records.size() / 1000 + 1;
-        System.out.println(records.size());
-        json.repartition(partitionNum).foreachPartition(iterator -> {
-
-            if (!iterator.hasNext()) {
-                return;
-            }
-            Jedis jedis = new Jedis(hostName, port);
-            jedis.auth(password);
-            Pipeline pipeline = jedis.pipelined();
-            int j = 0;
-            while (iterator.hasNext()) {
-                j++;
-                Pair<String, String> pair = iterator.next();
-                pipeline.setex(pair.getLeft(), 7200, pair.getRight());
-            }
-            System.out.println(j);
-            pipeline.sync();
-            jedis.close();
-        });
-    }
-
-
-//        System.out.println("counts: ");
-//        System.out.println(readData.count());
 
-    private String redisKey(Record r) {
-        // TODO 根据配置生成Key
-        return "";
+        args = new String[10];
+        args[0] = "-project";
+        args[1] = "loghubods";
+        args[2] = "-table";
+        args[3] = "alg_vid_feature_all_exp";
+        args[4] = "-dt";
+        args[5] = "20240522";
+        args[6] = "-hh";
+        args[7] = "15";
+        args[8] = "-env";
+        args[9] = "test";
+        ODPSToRedis.main(args);
     }
 
 }

+ 4 - 2
recommend-feature-produce/src/main/java/feature/produce/ODPSToRedis.java

@@ -36,7 +36,7 @@ public class ODPSToRedis {
         }
         // ODPS
         DTSConfig config = dtsConfigService.getDTSConfig(argMap);
-        if (!config.selfCheck()) {
+        if (config == null || !config.selfCheck()) {
             log.error("dts config error");
             return;
         }
@@ -48,7 +48,9 @@ public class ODPSToRedis {
         }
 
         // RDD
-        SparkConf sparkConf = new SparkConf().setAppName("odps sync to redis");
+        SparkConf sparkConf = new SparkConf()
+                .setAppName("odps sync to redis");
+        // .setMaster("local");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         JavaRDD<Map<String, String>> readData = jsc.parallelize(fieldValues);
         JavaRDD<Pair<String, String>> json = readData.map(

+ 4 - 3
recommend-feature-produce/src/main/java/feature/produce/model/DTSConfig.java

@@ -3,6 +3,7 @@ package feature.produce.model;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
@@ -10,19 +11,19 @@ import java.util.List;
  */
 @Data
 @Slf4j
-public class DTSConfig {
+public class DTSConfig implements Serializable {
     private ODPS odps;
     private Redis redis;
 
     @Data
-    public static class ODPS {
+    public static class ODPS implements Serializable {
         private String table;
         private List<String> cols;
         private List<String> partition;
     }
 
     @Data
-    public static class Redis {
+    public static class Redis implements Serializable {
         private String prefix;
         private List<String> key;
         private long expire;

+ 14 - 1
recommend-feature-produce/src/main/java/feature/produce/service/DTSConfigService.java

@@ -21,7 +21,20 @@ import java.util.Optional;
 @Slf4j
 public class DTSConfigService {
 
+    public void init(String env) {
+        System.setProperty("app.id", "recommend-feature");
+
+        if (StringUtils.equals(env, "prod")) {
+            System.setProperty("apollo.meta", "http://apolloconfig-internal.piaoquantv.com");
+        } else {
+            System.setProperty("apollo.meta", "http://devapolloconfig-internal.piaoquantv.com");
+        }
+    }
+
     public DTSConfig getDTSConfig(Map<String, String> argMap) {
+
+        init(argMap.get("env"));
+
         List<DTSConfig> dtsConfigs = JSONUtils.fromJson(
                 ConfigService.getAppConfig().getProperty("dts.config.v2", ""),
                 new TypeToken<List<DTSConfig>>() {
@@ -34,7 +47,7 @@ public class DTSConfigService {
         Optional<DTSConfig> optional = dtsConfigs.stream()
                 .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), argMap.get("table")))
                 .findFirst();
-        if (optional.isPresent()) {
+        if (!optional.isPresent()) {
             log.error("table {} not config", argMap.get("table"));
             return null;
         }