sunmingze 1 rok pred
rodič
commit
1ef9d047a3

+ 7 - 9
src/main/java/examples/sparksql/SparkAdCVRSampleLoader.java

@@ -36,25 +36,23 @@ public class SparkAdCVRSampleLoader {
         OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
         System.out.println("Read odps table...");
 
-        JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
-        readData.saveAsTextFile(hdfsPath);
+        JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
+        readData.filter(row -> row.getString("adclick_ornot").equals("0")).map(line -> singleParse(line)).saveAsTextFile(hdfsPath);
     }
 
 
-    static class RecordsToSamples implements Function2<Record, TableSchema, String> {
+    static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
         @Override
-        public String call(Record record, TableSchema schema) throws Exception {
-            String labelName = "adinvert_ornot";
-            String ret = singleParse(record, labelName);
-            return ret;
+        public Record call(Record record, TableSchema schema) throws Exception {
+            return record;
         }
     }
 
 
     // 单条日志处理逻辑
-    public static String singleParse(Record record, String labelName) {
+    public static String singleParse(Record record) {
         // 数据解析
-        String label = record.getString(labelName);
+        String label = record.getString("adinvert_ornot");
         if (label == null || label.equals("1")) {
             label = "0";
         } else {