SparkShareRatioSampleLoader.java 4.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package examples.sparksql;
  2. import com.aliyun.odps.TableSchema;
  3. import com.aliyun.odps.data.Record;
  4. import com.google.common.collect.ListMultimap;
  5. import com.tzld.piaoquan.data.base.*;
  6. import examples.dataloader.RecommendSampleConstructor;
  7. import com.tzld.piaoquan.data.score.feature.VlogShareLRFeatureExtractor;
  8. import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
  9. import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
  10. import org.apache.spark.SparkConf;
  11. import org.apache.spark.aliyun.odps.OdpsOps;
  12. import org.apache.spark.api.java.JavaRDD;
  13. import org.apache.spark.api.java.JavaSparkContext;
  14. import org.apache.spark.api.java.function.Function2;
  15. import java.util.ArrayList;
  16. import java.util.Map;
  17. public class SparkShareRatioSampleLoader {
  18. public static void main(String[] args) {
  19. String partition = args[0];
  20. String accessId = "LTAIWYUujJAm7CbH";
  21. String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
  22. String odpsUrl = "http://service.odps.aliyun.com/api";
  23. String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
  24. String project = "loghubods";
  25. String table = "alg_recsys_view_sample";
  26. String hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition;
  27. SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
  28. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  29. OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
  30. System.out.println("Read odps table...");
  31. JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(50));
  32. readData.saveAsTextFile(hdfsPath);
  33. }
  34. static class RecordsToSamples implements Function2<Record, TableSchema, String> {
  35. @Override
  36. public String call(Record record, TableSchema schema) throws Exception {
  37. String labelName = "share_ornot";
  38. String ret = singleParse(record, labelName);
  39. return ret;
  40. }
  41. }
  42. // 单条日志处理逻辑
  43. public static String singleParse(Record record, String labelName) {
  44. // 数据解析
  45. String label = record.getString(labelName);
  46. if (label == null || label.equals("1")) {
  47. label = "0";
  48. } else {
  49. label = "1";
  50. }
  51. // 从sql的 record中 初始化对象内容
  52. RequestContext requestContext = RecommendSampleConstructor.constructRequestContext(record);
  53. UserFeature userFeature = RecommendSampleConstructor.constructUserFeature(record);
  54. ItemFeature itemFeature = RecommendSampleConstructor.constructItemFeature(record);
  55. // 转化成bytes
  56. RequestContextBytesFeature requestContextBytesFeature = new RequestContextBytesFeature(requestContext);
  57. UserBytesFeature userBytesFeature = new UserBytesFeature(userFeature);
  58. VideoBytesFeature videoBytesFeature = new VideoBytesFeature(itemFeature);
  59. // 特征抽取
  60. VlogShareLRFeatureExtractor bytesFeatureExtractor;
  61. bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
  62. bytesFeatureExtractor.getUserFeatures(userBytesFeature);
  63. bytesFeatureExtractor.getItemFeature(videoBytesFeature);
  64. bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature);
  65. ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
  66. return parseSamplesToString(label, featureMap);
  67. }
  68. // 构建样本的字符串
  69. public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
  70. ArrayList<String> featureList = new ArrayList<String>();
  71. for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
  72. FeatureGroup groupedFeature = entry.getKey();
  73. BaseFeature baseFeature = entry.getValue();
  74. Long featureIdentifier = baseFeature.getIdentifier();
  75. featureList.add(String.valueOf(featureIdentifier) + ":1");
  76. }
  77. return label + "\t" + String.join("\t", featureList);
  78. }
  79. }