SparkAdCVRSampleTester.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package examples.sparksql;
  2. import com.aliyun.odps.TableSchema;
  3. import com.aliyun.odps.data.Record;
  4. import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
  5. import com.tzld.piaoquan.recommend.feature.domain.ad.feature.VlogAdCtrLRFeatureExtractor;
  6. import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
  7. import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
  8. import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
  9. import examples.dataloader.AdSampleConstructor;
  10. import org.apache.spark.SparkConf;
  11. import org.apache.spark.aliyun.odps.OdpsOps;
  12. import org.apache.spark.api.java.JavaRDD;
  13. import org.apache.spark.api.java.JavaSparkContext;
  14. import org.apache.spark.api.java.function.Function2;
  15. import java.util.ArrayList;
  16. public class SparkAdCVRSampleTester {
  17. public static void main(String[] args) {
  18. String partition = args[0];
  19. String accessId = "LTAIWYUujJAm7CbH";
  20. String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
  21. String odpsUrl = "http://service.odps.aliyun.com/api";
  22. String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
  23. String project = "loghubods";
  24. String table = "alg_ad_view_sample";
  25. String hdfsPath = "/dw/recommend/model/ad_cvr_samples_test/" + partition;
  26. SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
  27. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  28. OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
  29. System.out.println("Read odps table...");
  30. JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
  31. readData.filter(row -> row.getString("type").equals("VlogAdCtrLRScorer"))
  32. .map(line -> singleParse(line))
  33. .saveAsTextFile(hdfsPath);
  34. }
  35. static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
  36. @Override
  37. public Record call(Record record, TableSchema schema) throws Exception {
  38. return record;
  39. }
  40. }
  41. // 单条日志处理逻辑
  42. public static String singleParse(Record record) {
  43. // 数据解析
  44. String label = record.getString("pcvr");
  45. // 从sql的 record中 初始化对象内容
  46. AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
  47. UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
  48. AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
  49. // 转化成bytes
  50. AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
  51. UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
  52. AdItemBytesFeature adItemBytesFeature = new AdItemBytesFeature(itemFeature);
  53. // 特征抽取
  54. VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
  55. bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
  56. LRSamples lrSamples = bytesFeatureExtractor.single(userBytesFeature, adItemBytesFeature, adRequestContextBytesFeature);
  57. return parseSamplesToString2(label, lrSamples);
  58. }
  59. // 构建样本的字符串
  60. public static String parseSamplesToString2(String label, LRSamples lrSamples) {
  61. ArrayList<String> featureList = new ArrayList<String>();
  62. for (int i = 0; i < lrSamples.getFeaturesCount(); i++) {
  63. GroupedFeature groupedFeature = lrSamples.getFeatures(i);
  64. if (groupedFeature != null && groupedFeature.getFeaturesCount() != 0) {
  65. for (int j = 0; j < groupedFeature.getFeaturesCount(); j++) {
  66. BaseFeature baseFeature = groupedFeature.getFeatures(j);
  67. if (baseFeature != null) {
  68. featureList.add(String.valueOf(baseFeature.getIdentifier()) + ":1");
  69. }
  70. }
  71. }
  72. }
  73. return label + "\t" + String.join("\t", featureList);
  74. }
  75. }