浏览代码

test cvr cvr

sunmingze 1 年之前
父节点
当前提交
7bbca94649

+ 96 - 0
src/main/java/examples/sparksql/SparkAdCTRSampleTester.java

@@ -0,0 +1,96 @@
+package examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import examples.dataloader.AdSampleConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+
+
+public class SparkAdCTRSampleTester {
+
+    public static void main(String[] args) {
+
+        String partition = args[0];
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_ad_view_sample";
+        String hdfsPath = "/dw/recommend/model/ad_ctr_samples_test/" + partition;
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+        JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
+        readData.filter(row -> row.getString("type").equals("VlogAdCtrLRScorer"))
+                .map(line -> singleParse(line))
+                .saveAsTextFile(hdfsPath);
+    }
+
+
+    static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
+        @Override
+        public Record call(Record record, TableSchema schema) throws Exception {
+            return record;
+        }
+    }
+
+
+    // 单条日志处理逻辑
+    public static String singleParse(Record record) {
+        // 数据解析
+        String label = record.getString("pctr");
+
+        // 从sql的 record中 初始化对象内容
+        AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
+        UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
+        AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
+
+        // 转化成bytes
+        AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
+        UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
+        AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
+
+        // 特征抽取
+        VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
+
+        LRSamples lrSamples = bytesFeatureExtractor.single(userBytesFeature, adItemBytesFeature, adRequestContextBytesFeature);
+
+        return parseSamplesToString2(label, lrSamples);
+    }
+
+
+    // 构建样本的字符串
+    public static String parseSamplesToString2(String label, LRSamples lrSamples) {
+        ArrayList<String> featureList = new ArrayList<String>();
+        for (int i = 0; i < lrSamples.getFeaturesCount(); i++) {
+            GroupedFeature groupedFeature = lrSamples.getFeatures(i);
+            if (groupedFeature != null && groupedFeature.getFeaturesCount() != 0) {
+                for (int j = 0; j < groupedFeature.getFeaturesCount(); j++) {
+                    BaseFeature baseFeature = groupedFeature.getFeatures(j);
+                    if (baseFeature != null) {
+                        featureList.add(String.valueOf(baseFeature.getIdentifier()));
+                    }
+                }
+            }
+        }
+        return label + "\t" + String.join("\t", featureList);
+    }
+
+
+}

+ 96 - 0
src/main/java/examples/sparksql/SparkAdCVRSampleTester.java

@@ -0,0 +1,96 @@
+package examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
+import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import examples.dataloader.AdSampleConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+
+
+public class SparkAdCVRSampleTester {
+
+    public static void main(String[] args) {
+
+        String partition = args[0];
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_ad_view_sample";
+        String hdfsPath = "/dw/recommend/model/ad_cvr_samples_test/" + partition;
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+        JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
+        readData.filter(row -> row.getString("type").equals("VlogAdCtrLRScorer"))
+                .map(line -> singleParse(line))
+                .saveAsTextFile(hdfsPath);
+    }
+
+
+    static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
+        @Override
+        public Record call(Record record, TableSchema schema) throws Exception {
+            return record;
+        }
+    }
+
+
+    // 单条日志处理逻辑
+    public static String singleParse(Record record) {
+        // 数据解析
+        String label = record.getString("pcvr");
+
+        // 从sql的 record中 初始化对象内容
+        AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
+        UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
+        AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
+
+        // 转化成bytes
+        AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
+        UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
+        AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
+
+        // 特征抽取
+        VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
+
+        LRSamples lrSamples = bytesFeatureExtractor.single(userBytesFeature, adItemBytesFeature, adRequestContextBytesFeature);
+
+        return parseSamplesToString2(label, lrSamples);
+    }
+
+
+    // 构建样本的字符串
+    public static String parseSamplesToString2(String label, LRSamples lrSamples) {
+        ArrayList<String> featureList = new ArrayList<String>();
+        for (int i = 0; i < lrSamples.getFeaturesCount(); i++) {
+            GroupedFeature groupedFeature = lrSamples.getFeatures(i);
+            if (groupedFeature != null && groupedFeature.getFeaturesCount() != 0) {
+                for (int j = 0; j < groupedFeature.getFeaturesCount(); j++) {
+                    BaseFeature baseFeature = groupedFeature.getFeatures(j);
+                    if (baseFeature != null) {
+                        featureList.add(String.valueOf(baseFeature.getIdentifier()));
+                    }
+                }
+            }
+        }
+        return label + "\t" + String.join("\t", featureList);
+    }
+
+
+}

+ 95 - 0
src/main/java/examples/sparksql/SparkAdFeaToRedisHourLoader.java

@@ -0,0 +1,95 @@
+package examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdItemFeature;
+import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
+import examples.dataloader.AdRedisFeatureConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class SparkAdFeaToRedisHourLoader {
+
+    private static final String adKeyFormat = "ad:%s";
+
+
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+
+    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
+        Map<String, String> redisFormat = new HashMap<String, String>();
+        String key = line.get(0);
+        String value = line.get(1);
+        redisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(redisFormat);
+    }
+
+
+    static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
+        @Override
+        public List<String> call(Record record, TableSchema schema) throws Exception {
+            AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
+            // ad feature 中的key以creativeID拼接
+            String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
+            String value = adItemFeature.getValue();
+            List<String> kv = new ArrayList<String>();
+            kv.add(key);
+            kv.add(value);
+            return kv;
+        }
+    }
+
+
+
+    public static void main(String[] args) {
+
+        String partition = args[0];
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String tableAdInfo = "alg_ad_item_info";
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+
+        // load Ad features
+        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
+        readAdData.foreachPartition(
+                rowIterator -> {
+                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+                }
+        );
+    }
+
+}