package examples.sparksql; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.Record; import org.apache.spark.SparkConf; import org.apache.spark.aliyun.odps.OdpsOps; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; public class SparkAdCVRSampleTester { public static void main(String[] args) { String partition = args[0]; String accessId = "LTAIWYUujJAm7CbH"; String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"; String odpsUrl = "http://service.odps.aliyun.com/api"; String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"; String project = "loghubods"; String table = "alg_ad_view_sample"; String hdfsPath = "/dw/recommend/model/ad_cvr_samples_test/" + partition; SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl); System.out.println("Read odps table..."); JavaRDD readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30)); readData.filter(row -> row.get("type") != null) .filter(row -> row.get("lrsample") != null) .filter(row -> row.getString("adclick_ornot").equals("0")) .map(line -> singleParse(line)) .saveAsTextFile(hdfsPath); } static class RecordsToSamples implements Function2 { @Override public Record call(Record record, TableSchema schema) throws Exception { return record; } } public static String singleParse(Record record) { // 数据解析 String label = record.getString("adinvert_ornot"); if (label == null || label.equals("1")) { label = "0"; } else { label = "1"; } String samples = record.getString("lrsample").replaceAll("\\\\t","\t"); return label + "\t" + samples; } }