|
@@ -1,100 +1,100 @@
|
|
|
-package examples.sparksql;
|
|
|
-
|
|
|
-import com.aliyun.odps.TableSchema;
|
|
|
-import com.aliyun.odps.data.Record;
|
|
|
-import com.google.common.collect.ListMultimap;
|
|
|
-import com.tzld.piaoquan.ad.engine.commons.base.*;
|
|
|
-import com.tzld.piaoquan.ad.engine.commons.score.feature.VlogAdCtrLRFeatureExtractor;
|
|
|
-import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
|
|
|
-import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
|
|
|
-import examples.dataloader.AdSampleConstructor;
|
|
|
-import examples.dataloader.RecommendSampleConstructor;
|
|
|
-import org.apache.spark.SparkConf;
|
|
|
-import org.apache.spark.aliyun.odps.OdpsOps;
|
|
|
-import org.apache.spark.api.java.JavaRDD;
|
|
|
-import org.apache.spark.api.java.JavaSparkContext;
|
|
|
-import org.apache.spark.api.java.function.Function2;
|
|
|
-
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-
|
|
|
-public class SparkAdCTRSampleLoader {
|
|
|
-
|
|
|
- public static void main(String[] args) {
|
|
|
-
|
|
|
- String partition = args[0];
|
|
|
- String accessId = "LTAIWYUujJAm7CbH";
|
|
|
- String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
|
|
|
- String odpsUrl = "http://service.odps.aliyun.com/api";
|
|
|
- String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
|
|
|
- String project = "loghubods";
|
|
|
- String table = "alg_ad_view_sample";
|
|
|
- String hdfsPath = "/dw/recommend/model/ad_ctr_samples/" + partition;
|
|
|
-
|
|
|
- SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
|
|
|
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
|
|
- OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
|
|
|
- System.out.println("Read odps table...");
|
|
|
-
|
|
|
- JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
|
|
|
- readData.saveAsTextFile(hdfsPath);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- static class RecordsToSamples implements Function2<Record, TableSchema, String> {
|
|
|
- @Override
|
|
|
- public String call(Record record, TableSchema schema) throws Exception {
|
|
|
- String labelName = "adclick_ornot";
|
|
|
- String ret = singleParse(record, labelName);
|
|
|
- return ret;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- // 单条日志处理逻辑
|
|
|
- public static String singleParse(Record record, String labelName) {
|
|
|
- // 数据解析
|
|
|
- String label = record.getString(labelName);
|
|
|
- if (label == null || label.equals("0")) {
|
|
|
- label = "0";
|
|
|
- } else {
|
|
|
- label = "1";
|
|
|
- }
|
|
|
-
|
|
|
- // 从sql的 record中 初始化对象内容
|
|
|
- AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
|
|
|
- UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
|
|
|
- AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
|
|
|
-
|
|
|
- // 转化成bytes
|
|
|
- AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
|
|
|
- UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
|
|
|
- AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
|
|
|
-
|
|
|
- // 特征抽取
|
|
|
- VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
|
|
|
- bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
|
|
|
-
|
|
|
- bytesFeatureExtractor.getUserFeatures(userBytesFeature);
|
|
|
- bytesFeatureExtractor.getItemFeature(adItemBytesFeature);
|
|
|
- bytesFeatureExtractor.getContextFeatures(adRequestContextBytesFeature);
|
|
|
- bytesFeatureExtractor.getCrossFeature(adItemBytesFeature, adRequestContextBytesFeature, userBytesFeature);
|
|
|
-
|
|
|
- ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
|
|
|
- return parseSamplesToString(label, featureMap);
|
|
|
- }
|
|
|
-
|
|
|
- // 构建样本的字符串
|
|
|
- public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
|
|
|
- ArrayList<String> featureList = new ArrayList<String>();
|
|
|
- for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
|
|
|
- FeatureGroup groupedFeature = entry.getKey();
|
|
|
- BaseFeature baseFeature = entry.getValue();
|
|
|
- Long featureIdentifier = baseFeature.getIdentifier();
|
|
|
- featureList.add(String.valueOf(featureIdentifier) + ":1");
|
|
|
- }
|
|
|
- return label + "\t" + String.join("\t", featureList);
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
+//package examples.sparksql;
|
|
|
+//
|
|
|
+//import com.aliyun.odps.TableSchema;
|
|
|
+//import com.aliyun.odps.data.Record;
|
|
|
+//import com.google.common.collect.ListMultimap;
|
|
|
+//import com.tzld.piaoquan.ad.engine.commons.base.*;
|
|
|
+//import com.tzld.piaoquan.ad.engine.commons.score.feature.VlogAdCtrLRFeatureExtractor;
|
|
|
+//import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
|
|
|
+//import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
|
|
|
+//import examples.dataloader.AdSampleConstructor;
|
|
|
+//import examples.dataloader.RecommendSampleConstructor;
|
|
|
+//import org.apache.spark.SparkConf;
|
|
|
+//import org.apache.spark.aliyun.odps.OdpsOps;
|
|
|
+//import org.apache.spark.api.java.JavaRDD;
|
|
|
+//import org.apache.spark.api.java.JavaSparkContext;
|
|
|
+//import org.apache.spark.api.java.function.Function2;
|
|
|
+//
|
|
|
+//import java.util.ArrayList;
|
|
|
+//import java.util.Map;
|
|
|
+//
|
|
|
+//
|
|
|
+//public class SparkAdCTRSampleLoader {
|
|
|
+//
|
|
|
+// public static void main(String[] args) {
|
|
|
+//
|
|
|
+// String partition = args[0];
|
|
|
+// String accessId = "LTAIWYUujJAm7CbH";
|
|
|
+// String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
|
|
|
+// String odpsUrl = "http://service.odps.aliyun.com/api";
|
|
|
+// String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
|
|
|
+// String project = "loghubods";
|
|
|
+// String table = "alg_ad_view_sample";
|
|
|
+// String hdfsPath = "/dw/recommend/model/ad_ctr_samples/" + partition;
|
|
|
+//
|
|
|
+// SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
|
|
|
+// JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
|
|
+// OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
|
|
|
+// System.out.println("Read odps table...");
|
|
|
+//
|
|
|
+// JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
|
|
|
+// readData.saveAsTextFile(hdfsPath);
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// static class RecordsToSamples implements Function2<Record, TableSchema, String> {
|
|
|
+// @Override
|
|
|
+// public String call(Record record, TableSchema schema) throws Exception {
|
|
|
+// String labelName = "adclick_ornot";
|
|
|
+// String ret = singleParse(record, labelName);
|
|
|
+// return ret;
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// // 单条日志处理逻辑
|
|
|
+// public static String singleParse(Record record, String labelName) {
|
|
|
+// // 数据解析
|
|
|
+// String label = record.getString(labelName);
|
|
|
+// if (label == null || label.equals("0")) {
|
|
|
+// label = "0";
|
|
|
+// } else {
|
|
|
+// label = "1";
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 从sql的 record中 初始化对象内容
|
|
|
+// AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
|
|
|
+// UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
|
|
|
+// AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
|
|
|
+//
|
|
|
+// // 转化成bytes
|
|
|
+// AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
|
|
|
+// UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
|
|
|
+// AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
|
|
|
+//
|
|
|
+// // 特征抽取
|
|
|
+// VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
|
|
|
+// bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
|
|
|
+//
|
|
|
+// bytesFeatureExtractor.getUserFeatures(userBytesFeature);
|
|
|
+// bytesFeatureExtractor.getItemFeature(adItemBytesFeature);
|
|
|
+// bytesFeatureExtractor.getContextFeatures(adRequestContextBytesFeature);
|
|
|
+// bytesFeatureExtractor.getCrossFeature(adItemBytesFeature, adRequestContextBytesFeature, userBytesFeature);
|
|
|
+//
|
|
|
+// ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
|
|
|
+// return parseSamplesToString(label, featureMap);
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 构建样本的字符串
|
|
|
+// public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
|
|
|
+// ArrayList<String> featureList = new ArrayList<String>();
|
|
|
+// for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
|
|
|
+// FeatureGroup groupedFeature = entry.getKey();
|
|
|
+// BaseFeature baseFeature = entry.getValue();
|
|
|
+// Long featureIdentifier = baseFeature.getIdentifier();
|
|
|
+// featureList.add(String.valueOf(featureIdentifier) + ":1");
|
|
|
+// }
|
|
|
+// return label + "\t" + String.join("\t", featureList);
|
|
|
+// }
|
|
|
+//
|
|
|
+//}
|