Explorar o código

LR、FM模型

丁云鹏 hai 11 meses
pai
achega
1601ac6d77

+ 57 - 10
recommend-feature-produce/pom.xml

@@ -12,34 +12,82 @@
     <artifactId>recommend-feature-produce</artifactId>
 
     <properties>
-        <spark.version>3.5.1</spark.version>
-        <odps.version>0.48.3-public</odps.version>
-
+        <spark.version>2.3.1</spark.version>
+        <emr.version>2.0.0</emr.version>
+        <java.version>1.8</java.version>
+        <odps.version>0.48.4-public</odps.version>
+        <fastjson.version>1.2.45</fastjson.version>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
     </properties>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.12</artifactId>
+            <artifactId>spark-core_2.11</artifactId>
             <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-all</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.17.Final</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <!--        <dependency>-->
+        <!--            <groupId>org.apache.spark</groupId>-->
+        <!--            <artifactId>spark-mllib_2.11</artifactId>-->
+        <!--            <version>${spark.version}</version>-->
+        <!--        </dependency>-->
+
+        <!--        <dependency>-->
+        <!--            <groupId>org.apache.spark</groupId>-->
+        <!--            <artifactId>spark-sql_2.11</artifactId>-->
+        <!--            <version>${spark.version}</version>-->
+        <!--        </dependency>-->
+
+        <!--        <dependency>-->
+        <!--            <groupId>org.apache.spark</groupId>-->
+        <!--            <artifactId>spark-streaming_2.11</artifactId>-->
+        <!--            <version>${spark.version}</version>-->
+        <!--        </dependency>-->
+        <dependency>
+            <groupId>com.aliyun.emr</groupId>
+            <artifactId>emr-maxcompute_2.11</artifactId>
+            <version>${emr.version}</version>
         </dependency>
         <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
             <version>5.1.3</version>
         </dependency>
-        <dependency>
-            <groupId>com.redislabs</groupId>
-            <artifactId>spark-redis_2.12</artifactId>
-            <version>3.1.0</version>
-        </dependency>
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-core</artifactId>
             <version>${odps.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
+
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-commons</artifactId>
@@ -60,7 +108,6 @@
             <artifactId>slf4j-simple</artifactId>
             <version>1.7.28</version>
         </dependency>
-
     </dependencies>
     <build>
         <plugins>

+ 38 - 19
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/ODPSToRedis.java

@@ -7,14 +7,12 @@ import com.tzld.piaoquan.recommend.feature.produce.service.ODPSService;
 import com.tzld.piaoquan.recommend.feature.produce.service.RedisService;
 import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -34,9 +32,18 @@ public class ODPSToRedis {
             log.error("args is empty");
             return;
         }
-        String env = argMap.get("env");
+
+        SparkConf sparkConf = new SparkConf()
+                .setMaster("local")
+                .setAppName("odps sync to redis");
+        for (Map.Entry<String, String> e : argMap.entrySet()) {
+            sparkConf.set(e.getKey(), e.getValue());
+        }
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
         // ODPS
         log.info("read odps");
+        String env = argMap.get("env");
         DTSConfigService dtsConfigService = new DTSConfigService(env);
         DTSConfig config = dtsConfigService.getDTSConfig(argMap);
         if (config == null || !config.selfCheck()) {
@@ -44,27 +51,39 @@ public class ODPSToRedis {
             return;
         }
 
-        List<Map<String, String>> fieldValues = odpsService.read(config, argMap);
-        log.info("odps count {}", fieldValues.size());
-        if (CollectionUtils.isEmpty(fieldValues)) {
+        long count = 0;
+        int retry = 50;
+        while (count <= 0 && retry-- > 0) {
+            count = odpsService.count(config, argMap);
+            if (count <= 0) {
+                if (retry <= 0) {
+                    // TODO 报警
+                    return;
+                } else {
+                    try {
+                        Thread.sleep(60);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        log.info("odps count {}", count);
+
+        JavaRDD<Map<String, String>> fieldValues = odpsService.read(jsc, config, argMap);
+        if (fieldValues == null) {
             log.info("odps empty");
             return;
         }
+        //log.info("odps count {}", fieldValues.count());
 
         // RDD
         log.info("sync redis");
         RedisService redisService = new RedisService(env);
-        SparkConf sparkConf = new SparkConf()
-                .setAppName("odps sync to redis");
-        // .setMaster("local");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        JavaRDD<Map<String, String>> readData = jsc.parallelize(fieldValues);
-        JavaRDD<Pair<String, String>> json = readData.map(
-                f -> Pair.of(redisService.redisKey(f, config), JSONUtils.toJson(f))
-        );
-        int partitionNum = fieldValues.size() / 1000 + 1;
-        json.repartition(partitionNum).foreachPartition(iterator -> {
-            redisService.mSet(iterator, config);
+        int partitionNum = NumberUtils.toInt(argMap.get("partitionNum"), 10);
+        fieldValues.repartition(partitionNum).foreachPartition(iterator -> {
+            redisService.mSetV2(iterator, config);
         });
         jsc.stop();
     }

+ 3 - 3
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/Test.java

@@ -10,11 +10,11 @@ public class Test {
         args[0] = "-project";
         args[1] = "loghubods";
         args[2] = "-table";
-        args[3] = "alg_vid_feature_feed_apptype_root_return";
+        args[3] = "alg_vid_feature_all_exp";
         args[4] = "-dt";
-        args[5] = "20240522";
+        args[5] = "20240611";
         args[6] = "-hh";
-        args[7] = "15";
+        args[7] = "01";
         args[8] = "-env";
         args[9] = "test";
         ODPSToRedis.main(args);

+ 1 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/DTSConfigService.java

@@ -5,7 +5,7 @@ import com.google.common.reflect.TypeToken;
 import com.tzld.piaoquan.recommend.feature.produce.model.DTSConfig;
 import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Collections;

+ 97 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/ODPSService.java

@@ -3,15 +3,22 @@ package com.tzld.piaoquan.recommend.feature.produce.service;
 import com.aliyun.odps.Instance;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.SimpleJsonValue;
 import com.aliyun.odps.task.SQLTask;
 import com.aliyun.odps.utils.StringUtils;
 import com.google.common.base.Joiner;
-import com.tzld.piaoquan.recommend.feature.produce.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.feature.produce.model.DTSConfig;
+import com.tzld.piaoquan.recommend.feature.produce.util.CommonCollectionUtils;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,7 +35,96 @@ public class ODPSService {
     private final String accessId = "LTAIWYUujJAm7CbH";
     private final String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
     private final String odpsUrl = "http://service.odps.aliyun.com/api";
+    private final String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun.com";
     private final String sqlFormat = "select %s from %s where 1=1 %s ;";
+    private final String countSqlFormat = "select count(1) as count from %s where 1=1 %s ;";
+
+
+    public JavaRDD<Map<String, String>> read(JavaSparkContext jsc, DTSConfig config, Map<String, String> argMap) {
+        String project = argMap.get("project");
+        if (StringUtils.isBlank(project)) {
+            return null;
+        }
+
+        String table = argMap.get("table");
+        if (StringUtils.isBlank(table)) {
+            return null;
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for (String p : config.getOdps().getPartition()) {
+            sb.append(p);
+            sb.append(" = ");
+            sb.append(argMap.get(p));
+            sb.append(",");
+        }
+        String partition = sb.deleteCharAt(sb.length() - 1).toString();
+        int partitionNum = NumberUtils.toInt(argMap.get("partitionNum"), 10);
+
+
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+
+        JavaRDD<Map<String, String>> readData = odpsOps.readTableWithJava(project, table, partition,
+                new RecordToMap(config.getOdps().getCols()), partitionNum);
+        return readData;
+    }
+
+    static class RecordToMap implements Function2<Record, TableSchema, Map<String, String>> {
+        private List<String> cols;
+
+        public RecordToMap(List<String> cols) {
+            this.cols = cols;
+        }
+
+        @Override
+        public Map<String, String> call(Record r, TableSchema schema) {
+            Map<String, String> map = new HashMap<>();
+            for (int i = 0; i < schema.getColumns().size(); i++) {
+                if (cols.contains(r.getColumns()[i].getName())) {
+                    Object obj = r.get(i);
+                    if (obj instanceof SimpleJsonValue) {
+                        map.put(r.getColumns()[i].getName(), ((SimpleJsonValue) obj).toString());
+                    } else {
+                        map.put(r.getColumns()[i].getName(), r.getString(i));
+                    }
+                }
+            }
+            return map;
+        }
+    }
+
+
+    public long count(DTSConfig config, Map<String, String> argMap) {
+        String project = argMap.get("project");
+        String table = argMap.get("table");
+
+        StringBuilder sb = new StringBuilder();
+        for (String partition : config.getOdps().getPartition()) {
+            sb.append(" and ");
+            sb.append(partition);
+            sb.append(" = ");
+            sb.append(argMap.get(partition));
+        }
+        String condition = sb.toString();
+
+        Account account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(odpsUrl);
+        odps.setDefaultProject(project);
+
+        String sql = String.format(countSqlFormat, table, condition);
+        List<Record> records;
+        try {
+            Instance i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            records = SQLTask.getResult(i);
+        } catch (OdpsException e) {
+            log.error("request odps error", e);
+            return 0;
+        }
+
+        return Integer.valueOf(records.get(0).getString(0));
+    }
 
 
     /**

+ 35 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/RedisService.java

@@ -1,13 +1,16 @@
 package com.tzld.piaoquan.recommend.feature.produce.service;
 
 import com.tzld.piaoquan.recommend.feature.produce.model.DTSConfig;
-import org.apache.commons.collections4.CollectionUtils;
+import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +53,37 @@ public class RedisService implements Serializable {
         mSetEx(keyValues, expire, TimeUnit.SECONDS);
     }
 
+
+    public void mSetV2(Iterator<Map<String, String>> dataIte, DTSConfig config) {
+        Map<String, String> batch = new HashMap<>();
+        while (dataIte.hasNext()) {
+            Map<String, String> record = dataIte.next();
+            String redisKey = redisKey(record, config);
+            String value = JSONUtils.toJson(record);
+            batch.put(redisKey, value);
+            if (batch.size() > 1000) {
+                mSet(batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
+                batch.clear();
+            }
+        }
+
+        if (MapUtils.isNotEmpty(batch)) {
+            mSet(batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
+        }
+    }
+
+    private void mSet(Map<String, String> batch, long expire, TimeUnit timeUnit) {
+        long expireSeconds = timeUnit.toSeconds(expire);
+        Jedis jedis = new Jedis(hostName, port);
+        jedis.auth(password);
+        Pipeline pipeline = jedis.pipelined();
+        for (Map.Entry<String, String> e : batch.entrySet()) {
+            pipeline.setex(e.getKey(), expireSeconds, e.getValue());
+        }
+        pipeline.sync();
+        jedis.close();
+    }
+
     public String redisKey(Map<String, String> fieldValueMap, DTSConfig config) {
         // Note:写入和读取的key生成规则应保持一致
         List<String> fields = config.getRedis().getKey();

+ 2 - 2
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/util/CommonCollectionUtils.java

@@ -1,7 +1,7 @@
 package com.tzld.piaoquan.recommend.feature.produce.util;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 
 import java.util.*;
 import java.util.function.Function;