1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- package examples.sparksql;
- import com.aliyun.odps.TableSchema;
- import com.aliyun.odps.data.Record;
- import org.apache.spark.SparkConf;
- import org.apache.spark.aliyun.odps.OdpsOps;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import java.util.ArrayList;
- public class SparkAdCVRSampleTester {
- public static void main(String[] args) {
- String partition = args[0];
- String accessId = "LTAIWYUujJAm7CbH";
- String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
- String odpsUrl = "http://service.odps.aliyun.com/api";
- String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
- String project = "loghubods";
- String table = "alg_ad_view_sample";
- String hdfsPath = "/dw/recommend/model/ad_cvr_samples_test/" + partition;
- SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
- System.out.println("Read odps table...");
- JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(30));
- readData.filter(row -> row.get("type") != null)
- .filter(row -> row.get("lrsample") != null)
- .filter(row -> row.getString("adclick_ornot").equals("0"))
- .map(line -> singleParse(line))
- .saveAsTextFile(hdfsPath);
- }
- static class RecordsToSamples implements Function2<Record, TableSchema, Record> {
- @Override
- public Record call(Record record, TableSchema schema) throws Exception {
- return record;
- }
- }
- public static String singleParse(Record record) {
- // 数据解析
- String label = record.getString("adinvert_ornot");
- if (label == null || label.equals("1")) {
- label = "0";
- } else {
- label = "1";
- }
- String samples = record.getString("lrsample").replaceAll("\\\\t","\t");
- return label + "\t" + samples;
- }
- }
|