SparkAdCVRSampleTester.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package examples.sparksql;
  2. import com.aliyun.odps.TableSchema;
  3. import com.aliyun.odps.data.Record;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.aliyun.odps.OdpsOps;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.Function2;
  9. import java.util.ArrayList;
  10. public class SparkAdCVRSampleTester {
  11. public static void main(String[] args) {
  12. String partition = args[0];
  13. String accessId = "LTAIWYUujJAm7CbH";
  14. String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
  15. String odpsUrl = "http://service.odps.aliyun.com/api";
  16. String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
  17. String project = "loghubods";
  18. String table = "alg_ad_view_sample";
  19. String hdfsPath = "/dw/recommend/model/ad_cvr_samples_test/" + partition;
  20. SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
  21. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  22. OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
  23. System.out.println("Read odps table...");
  24. JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
  25. readData.filter(row -> row.get("type") != null)
  26. .filter(row -> row.get("lrsample") != null)
  27. .filter(row -> row.getString("adclick_ornot").equals("0"))
  28. .map(line -> singleParse(line))
  29. .saveAsTextFile(hdfsPath);
  30. }
  31. static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
  32. @Override
  33. public Record call(Record record, TableSchema schema) throws Exception {
  34. return record;
  35. }
  36. }
  37. public static String singleParse(Record record) {
  38. // 数据解析
  39. String label = record.getString("adinvert_ornot");
  40. if (label == null || label.equals("1")) {
  41. label = "0";
  42. } else {
  43. label = "1";
  44. }
  45. String samples = record.getString("lrsample").replaceAll("\\\\t","\t");
  46. return label + "\t" + samples;
  47. }
  48. }