SparkAdCTRSampleLoader.java 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package examples.sparksql;
  2. import com.aliyun.odps.TableSchema;
  3. import com.aliyun.odps.data.Record;
  4. import com.google.common.collect.ListMultimap;
  5. import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
  6. import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
  7. import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
  8. import com.tzld.piaoquan.recommend.feature.model.sample.FeatureGroup;
  9. import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
  10. import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
  11. import examples.dataloader.AdSampleConstructor;
  12. import org.apache.spark.SparkConf;
  13. import org.apache.spark.aliyun.odps.OdpsOps;
  14. import org.apache.spark.api.java.JavaRDD;
  15. import org.apache.spark.api.java.JavaSparkContext;
  16. import org.apache.spark.api.java.function.Function2;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.Map;
  20. public class SparkAdCTRSampleLoader {
  21. public static void main(String[] args) {
  22. String partition = args[0];
  23. String accessId = "LTAIWYUujJAm7CbH";
  24. String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
  25. String odpsUrl = "http://service.odps.aliyun.com/api";
  26. String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
  27. String project = "loghubods";
  28. String table = "alg_ad_view_sample";
  29. String hdfsPath = "/dw/recommend/model/ad_ctr_samples/" + partition;
  30. SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
  31. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  32. OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
  33. System.out.println("Read odps table...");
  34. JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
  35. readData.saveAsTextFile(hdfsPath);
  36. }
  37. static class RecordsToSamples implements Function2<Record, TableSchema, String> {
  38. @Override
  39. public String call(Record record, TableSchema schema) throws Exception {
  40. String labelName = "adclick_ornot";
  41. String ret = singleParse(record, labelName);
  42. return ret;
  43. }
  44. }
  45. // 单条日志处理逻辑
  46. public static String singleParse(Record record, String labelName) {
  47. // 数据解析
  48. String label = record.getString(labelName);
  49. if (label == null || label.equals("1")) {
  50. label = "0";
  51. } else {
  52. label = "1";
  53. }
  54. // 从sql的 record中 初始化对象内容
  55. AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
  56. UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
  57. AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
  58. // 转化成bytes
  59. AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
  60. UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
  61. AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
  62. // 特征抽取
  63. VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
  64. bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
  65. LRSamples lrSamples = bytesFeatureExtractor.single(userBytesFeature, adItemBytesFeature, adRequestContextBytesFeature);
  66. return parseSamplesToString2(label, lrSamples);
  67. }
  68. // 构建样本的字符串
  69. public static String parseSamplesToString2(String label, LRSamples lrSamples) {
  70. ArrayList<String> featureList = new ArrayList<String>();
  71. for (int i = 0; i < lrSamples.getFeaturesCount(); i++) {
  72. GroupedFeature groupedFeature = lrSamples.getFeatures(i);
  73. if (groupedFeature != null && groupedFeature.getFeaturesCount() != 0) {
  74. for (int j = 0; j < groupedFeature.getFeaturesCount(); j++) {
  75. BaseFeature baseFeature = groupedFeature.getFeatures(j);
  76. if (baseFeature != null) {
  77. featureList.add(String.valueOf(baseFeature.getIdentifier()) + ":1" );
  78. }
  79. }
  80. }
  81. }
  82. return label + "\t" + String.join("\t", featureList);
  83. }
  84. }