zhangbo 1 rok pred
rodič
commit
f9627ce190
30 zmenil súbory, kde vykonal 2215 pridanie a 1 odobranie
  1. 3 1
      MyFirstUDF/.gitignore
  2. 56 0
      MyFirstUDF/examples/com/aliyun/odps/examples/TestUtil.java
  3. 259 0
      MyFirstUDF/examples/com/aliyun/odps/examples/graph/Kmeans.java
  4. 107 0
      MyFirstUDF/examples/com/aliyun/odps/examples/graph/PageRank.java
  5. 130 0
      MyFirstUDF/examples/com/aliyun/odps/examples/graph/SSSP.java
  6. 84 0
      MyFirstUDF/examples/com/aliyun/odps/examples/mr/Resource.java
  7. 124 0
      MyFirstUDF/examples/com/aliyun/odps/examples/mr/WordCount.java
  8. 104 0
      MyFirstUDF/examples/com/aliyun/odps/examples/mr/test/WordCountTest.java
  9. 47 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDAFExample.java
  10. 93 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDAFResource.java
  11. 35 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDFExample.java
  12. 60 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDFResource.java
  13. 24 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDTFExample.java
  14. 68 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDTFResource.java
  15. 63 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDAFTest.java
  16. 69 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDFTest.java
  17. 68 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDTFTest.java
  18. 90 0
      MyFirstUDF/examples/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.java
  19. 240 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.java
  20. 18 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.java
  21. 140 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextExtractor.java
  22. 56 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextOutputer.java
  23. 18 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextStorageHandler.java
  24. 111 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/test/ExtractorTest.java
  25. 133 0
      MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/test/OutputerTest.java
  26. 6 0
      MyFirstUDF/examples/data/ambulance_csv/1.csv
  27. 9 0
      MyFirstUDF/examples/data/ambulance_csv/2.csv
  28. BIN
      MyFirstUDF/examples/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav
  29. BIN
      MyFirstUDF/examples/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav
  30. BIN
      MyFirstUDF/examples/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav

+ 3 - 1
MyFirstUDF/.gitignore

@@ -1 +1,3 @@
-.idea/*
+.idea/*
+target/*
+warehouse/*

+ 56 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/TestUtil.java

@@ -0,0 +1,56 @@
+package com.aliyun.odps.examples;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.local.common.WareHouse;
+
+import java.io.File;
+
+public class TestUtil {
+  private final static String accessId = "accessId";
+  private final static String accessKey = "accessKey";
+  private final static String endpoint = "endpoint";
+  private final static String defaultProject = "example_project";
+
+  static Odps odps;
+  static {
+    Account account = new AliyunAccount(accessId, accessKey);
+    odps = new Odps(account);
+    odps.setEndpoint(endpoint);
+    odps.setDefaultProject(defaultProject);
+  }
+
+  public static String join(Object[] obj) {
+    if (obj == null) {
+      return null;
+    }
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < obj.length; i++) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(obj[i]);
+    }
+    return sb.toString();
+  }
+
+  public static Odps getOdps() {
+    return odps;
+  }
+
+  public static WareHouse initWarehouse() {
+    //init the warehouse in project dir
+    File exampleProjectDir = new File("warehouse" + File.separator + defaultProject);
+    if (exampleProjectDir.exists()) {
+      return WareHouse.getInstance("warehouse");
+    } else {
+      exampleProjectDir = new File("../warehouse" + File.separator + defaultProject);
+      if (exampleProjectDir.exists()) {
+        return WareHouse.getInstance("../warehouse");
+      }
+    }
+    throw new RuntimeException("warehouse dir not exists");
+  }
+
+}

+ 259 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/graph/Kmeans.java

@@ -0,0 +1,259 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.Aggregator;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Tuple;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableRecord;
+
+/**
+ * Set resources arguments:
+ *  kmeans_centers 
+ *  Set program arguments: 
+ *  kmeans_in kmeans_out
+ */
+public class Kmeans {
+  private final static Log LOG = LogFactory.getLog(Kmeans.class);
+
+  public static class KmeansVertex extends Vertex<Text, Tuple, NullWritable, NullWritable> {
+
+    @Override
+    public void compute(ComputeContext<Text, Tuple, NullWritable, NullWritable> context,
+        Iterable<NullWritable> messages) throws IOException {
+      context.aggregate(getValue());
+    }
+
+  }
+
+  public static class KmeansVertexReader extends
+      GraphLoader<Text, Tuple, NullWritable, NullWritable> {
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<Text, Tuple, NullWritable, NullWritable> context) throws IOException {
+      KmeansVertex vertex = new KmeansVertex();
+      vertex.setId(new Text(String.valueOf(recordNum.get())));
+      vertex.setValue(new Tuple(record.getAll()));
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  public static class KmeansAggrValue implements Writable {
+
+    Tuple centers = new Tuple();
+    Tuple sums = new Tuple();
+    Tuple counts = new Tuple();
+
+    public void write(DataOutput out) throws IOException {
+      centers.write(out);
+      sums.write(out);
+      counts.write(out);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      centers = new Tuple();
+      centers.readFields(in);
+      sums = new Tuple();
+      sums.readFields(in);
+      counts = new Tuple();
+      counts.readFields(in);
+    }
+
+    @Override
+    public String toString() {
+      return "centers " + centers.toString() + ", sums " + sums.toString() + ", counts "
+          + counts.toString();
+    }
+
+  }
+
+  public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public KmeansAggrValue createInitialValue(WorkerContext context) throws IOException {
+      KmeansAggrValue aggrVal = null;
+      if (context.getSuperstep() == 0) {
+        aggrVal = new KmeansAggrValue();
+        aggrVal.centers = new Tuple();
+        aggrVal.sums = new Tuple();
+        aggrVal.counts = new Tuple();
+
+        byte[] centers = context.readCacheFile("kmeans_centers");
+        String lines[] = new String(centers).split("\n");
+
+        for (int i = 0; i < lines.length; i++) {
+          String[] ss = lines[i].split(",");
+          Tuple center = new Tuple();
+          Tuple sum = new Tuple();
+          for (int j = 0; j < ss.length; ++j) {
+            center.append(new DoubleWritable(Double.valueOf(ss[j].trim())));
+            sum.append(new DoubleWritable(0.0));
+          }
+          LongWritable count = new LongWritable(0);
+          aggrVal.sums.append(sum);
+          aggrVal.counts.append(count);
+          aggrVal.centers.append(center);
+        }
+      } else {
+        aggrVal = (KmeansAggrValue) context.getLastAggregatedValue(0);
+      }
+
+      return aggrVal;
+    }
+
+    @Override
+    public void aggregate(KmeansAggrValue value, Object item) {
+      int min = 0;
+      double mindist = Double.MAX_VALUE;
+      Tuple point = (Tuple) item;
+
+      for (int i = 0; i < value.centers.size(); i++) {
+        Tuple center = (Tuple) value.centers.get(i);
+        // use Euclidean Distance, no need to calculate sqrt
+        double dist = 0.0d;
+        for (int j = 0; j < center.size(); j++) {
+          double v = ((DoubleWritable) point.get(j)).get() - ((DoubleWritable) center.get(j)).get();
+          dist += v * v;
+        }
+        if (dist < mindist) {
+          mindist = dist;
+          min = i;
+        }
+      }
+
+      // update sum and count
+      Tuple sum = (Tuple) value.sums.get(min);
+      for (int i = 0; i < point.size(); i++) {
+        DoubleWritable s = (DoubleWritable) sum.get(i);
+        s.set(s.get() + ((DoubleWritable) point.get(i)).get());
+      }
+      LongWritable count = (LongWritable) value.counts.get(min);
+      count.set(count.get() + 1);
+    }
+
+    @Override
+    public void merge(KmeansAggrValue value, KmeansAggrValue partial) {
+      for (int i = 0; i < value.sums.size(); i++) {
+        Tuple sum = (Tuple) value.sums.get(i);
+        Tuple that = (Tuple) partial.sums.get(i);
+
+        for (int j = 0; j < sum.size(); j++) {
+          DoubleWritable s = (DoubleWritable) sum.get(j);
+          s.set(s.get() + ((DoubleWritable) that.get(j)).get());
+        }
+      }
+
+      for (int i = 0; i < value.counts.size(); i++) {
+        LongWritable count = (LongWritable) value.counts.get(i);
+        count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean terminate(WorkerContext context, KmeansAggrValue value) throws IOException {
+
+      // compute new centers
+      Tuple newCenters = new Tuple(value.sums.size());
+      for (int i = 0; i < value.sums.size(); i++) {
+        Tuple sum = (Tuple) value.sums.get(i);
+        Tuple newCenter = new Tuple(sum.size());
+        LongWritable c = (LongWritable) value.counts.get(i);
+        for (int j = 0; j < sum.size(); j++) {
+
+          DoubleWritable s = (DoubleWritable) sum.get(j);
+          double val = s.get() / c.get();
+          newCenter.set(j, new DoubleWritable(val));
+
+          // reset sum for next iteration
+          s.set(0.0d);
+        }
+        // reset count for next iteration
+        c.set(0);
+        newCenters.set(i, newCenter);
+      }
+
+      // update centers
+      Tuple oldCenters = value.centers;
+      value.centers = newCenters;
+
+      LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);
+
+      // compare new/old centers
+      boolean converged = true;
+      for (int i = 0; i < value.centers.size() && converged; i++) {
+        Tuple oldCenter = (Tuple) oldCenters.get(i);
+        Tuple newCenter = (Tuple) newCenters.get(i);
+        double sum = 0.0d;
+        for (int j = 0; j < newCenter.size(); j++) {
+          double v =
+              ((DoubleWritable) newCenter.get(j)).get() - ((DoubleWritable) oldCenter.get(j)).get();
+          sum += v * v;
+        }
+        double dist = Math.sqrt(sum);
+        LOG.info("old center: " + oldCenter + ", new center: " + newCenter + ", dist: " + dist);
+        // converge threshold for each center: 0.05
+        converged = dist < 0.05d;
+      }
+
+      if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
+        // converged or reach max iteration, output centers
+        for (int i = 0; i < value.centers.size(); i++) {
+          context.write(((Tuple) value.centers.get(i)).toArray());
+        }
+        // true means to terminate iteration
+        return true;
+      }
+
+      // false means to continue iteration
+      return false;
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2)
+      printUsage();
+
+    GraphJob job = new GraphJob();
+
+    job.setGraphLoaderClass(KmeansVertexReader.class);
+    job.setRuntimePartitioning(false);
+    job.setVertexClass(KmeansVertex.class);
+    job.setAggregatorClass(KmeansAggregator.class);
+    job.addInput(TableInfo.builder().tableName(args[0]).build());
+    job.addOutput(TableInfo.builder().tableName(args[1]).build());
+
+    // default max iteration is 30
+    job.setMaxIteration(30);
+    if (args.length >= 3)
+      job.setMaxIteration(Integer.parseInt(args[2]));
+
+    long start = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0
+        + " seconds");
+  }
+}

+ 107 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/graph/PageRank.java

@@ -0,0 +1,107 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.IOException;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableRecord;
+
+/**
+ * Set program arguments:
+ * pagerank_in pagerank_out
+ * 
+ */
+public class PageRank {
+
+  public static class PageRankVertex extends
+      Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
+
+    @Override
+    public void compute(ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
+        Iterable<DoubleWritable> messages) throws IOException {
+      if (context.getSuperstep() == 0) {
+        setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
+      } else if (context.getSuperstep() >= 1) {
+        double sum = 0;
+        for (DoubleWritable msg : messages) {
+          sum += msg.get();
+        }
+        DoubleWritable vertexValue =
+            new DoubleWritable((0.15f / context.getTotalNumVertices()) + 0.85f * sum);
+        setValue(vertexValue);
+      }
+      if (hasEdges()) {
+        context.sendMessageToNeighbors(this, new DoubleWritable(getValue().get()
+            / getEdges().size()));
+      }
+    }
+
+    @Override
+    public void cleanup(WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
+        throws IOException {
+      context.write(getId(), getValue());
+    }
+  }
+
+  public static class PageRankVertexReader extends
+      GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
+        throws IOException {
+      PageRankVertex vertex = new PageRankVertex();
+      vertex.setValue(new DoubleWritable(0));
+      vertex.setId((Text) record.get(0));
+      System.out.println(record.get(0));
+
+      for (int i = 1; i < record.size(); i++) {
+        Writable edge = record.get(i);
+        System.out.println(edge.toString());
+        if (!(edge.equals(NullWritable.get()))) {
+          vertex.addEdge(new Text(edge.toString()), NullWritable.get());
+        }
+      }
+      System.out.println("vertex edgs size: " + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2)
+      printUsage();
+
+    GraphJob job = new GraphJob();
+
+    job.setGraphLoaderClass(PageRankVertexReader.class);
+    job.setVertexClass(PageRankVertex.class);
+    job.addInput(TableInfo.builder().tableName(args[0]).build());
+    job.addOutput(TableInfo.builder().tableName(args[1]).build());
+
+    // default max iteration is 30
+    job.setMaxIteration(30);
+    if (args.length >= 3)
+      job.setMaxIteration(Integer.parseInt(args[2]));
+
+    long startTime = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

+ 130 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/graph/SSSP.java

@@ -0,0 +1,130 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.IOException;
+
+import com.aliyun.odps.io.WritableRecord;
+import com.aliyun.odps.graph.Combiner;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.data.TableInfo;
+
+/**
+ * Set program arguments: 
+ * 1 sssp_in sssp_out
+ * 
+ */
+public class SSSP {
+
+  public static final String START_VERTEX = "sssp.start.vertex.id";
+
+  public static class SSSPVertex extends
+      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
+
+    private static long startVertexId = -1;
+
+    public SSSPVertex() {
+      this.setValue(new LongWritable(Long.MAX_VALUE));
+    }
+
+    public boolean isStartVertex(
+        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
+      if (startVertexId == -1) {
+        String s = context.getConfiguration().get(START_VERTEX);
+        startVertexId = Long.parseLong(s);
+      }
+      return getId().get() == startVertexId;
+    }
+
+    @Override
+    public void compute(
+        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
+        Iterable<LongWritable> messages) throws IOException {
+      long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
+
+      for (LongWritable msg : messages) {
+        if (msg.get() < minDist) {
+          minDist = msg.get();
+        }
+      }
+
+      if (minDist < this.getValue().get()) {
+        this.setValue(new LongWritable(minDist));
+        if (hasEdges()) {
+          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
+            context
+                .sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
+          }
+        }
+      } else {
+        voteToHalt();
+      }
+    }
+
+    @Override
+    public void cleanup(
+        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
+        throws IOException {
+      context.write(getId(), getValue());
+    }
+  }
+
+  public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
+
+    @Override
+    public void combine(LongWritable vertexId, LongWritable combinedMessage,
+        LongWritable messageToCombine) throws IOException {
+      if (combinedMessage.get() > messageToCombine.get()) {
+        combinedMessage.set(messageToCombine.get());
+      }
+    }
+
+  }
+
+  public static class SSSPVertexReader extends
+      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
+        throws IOException {
+      SSSPVertex vertex = new SSSPVertex();
+      vertex.setId((LongWritable) record.get(0));
+      String[] edges = record.get(1).toString().split(";");
+      for (int i = 0; i < edges.length; i++) {
+        String[] ss = edges[i].split(":");
+        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
+            new LongWritable(Long.parseLong(ss[1])));
+      }
+
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2) {
+      System.out.println("Usage: <startnode> <input> <output>");
+      System.exit(-1);
+    }
+
+    GraphJob job = new GraphJob();
+    job.setGraphLoaderClass(SSSPVertexReader.class);
+    job.setVertexClass(SSSPVertex.class);
+    job.setCombinerClass(MinLongCombiner.class);
+
+    job.set(START_VERTEX, args[0]);
+    job.addInput(TableInfo.builder().tableName(args[1]).build());
+    job.addOutput(TableInfo.builder().tableName(args[2]).build());
+
+    long startTime = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

+ 84 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/mr/Resource.java

@@ -0,0 +1,84 @@
+package com.aliyun.odps.examples.mr;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.mapred.JobClient;
+import com.aliyun.odps.mapred.MapperBase;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/*
+ * 该示例展示了如何在MapReduce程序中读取文件资源
+ * 该示例主要用于演示Local模式下的调试,如果要将该示例运行于在线环境,
+ * 请将 main方法中的语句 "job.setResources("file_resource.txt");" 删除
+ * 
+ * Usage: 
+ *  Set Resource arguments:
+ *  file_resource.txt 
+ *  Set program arguments:
+ *  wc_in1 rs_out
+ */
+public class Resource {
+
+  public static class TokenizerMapper extends MapperBase {
+    Record result;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      result = context.createOutputRecord();
+      long fileResourceLineCount = 0;
+
+      InputStream in = context.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      result.set(0, "file_resource_line_count");
+      result.set(1, fileResourceLineCount);
+      context.write(result);
+      br.close();
+
+      Iterator<Record> it = context.readResourceTable("table_resource1");
+      long tableResourceRecordCount = 0;
+      while (it.hasNext()) {
+        Record r = it.next();
+        ++tableResourceRecordCount;
+      }
+      result.set(0, "table_resource1_record_count");
+      result.set(1, tableResourceRecordCount);
+      context.write(result);
+
+      it = context.readResourceTable("table_resource2");
+      tableResourceRecordCount = 0;
+      while (it.hasNext()) {
+        Record r = it.next();
+        ++tableResourceRecordCount;
+      }
+      result.set(0, "table_resource2_record_count");
+      result.set(1, tableResourceRecordCount);
+      context.write(result);
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    JobConf job = new JobConf();
+    job.setMapperClass(TokenizerMapper.class);
+    job.setNumReduceTasks(0);
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("rs_out").build(), job);
+
+    JobClient.runJob(job);
+  }
+
+}

+ 124 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/mr/WordCount.java

@@ -0,0 +1,124 @@
+package com.aliyun.odps.examples.mr;
+
+import com.aliyun.odps.counter.Counter;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.mapred.JobClient;
+import com.aliyun.odps.mapred.MapperBase;
+import com.aliyun.odps.mapred.ReducerBase;
+import com.aliyun.odps.mapred.RunningJob;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+import com.aliyun.odps.mapred.utils.SchemaUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/*
+ * 该示例展示了MapReduce程序中的基本结构
+ */
+public class WordCount {
+
+  public static class TokenizerMapper extends MapperBase {
+
+    Record word;
+    Record one;
+    Counter gCnt;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      word = context.createMapOutputKeyRecord();
+      one = context.createMapOutputValueRecord();
+      one.set(new Object[] {1L});
+      gCnt = context.getCounter("MyCounters", "global_counts");
+    }
+
+    @Override
+    public void map(long recordNum, Record record, TaskContext context) throws IOException {
+      for (int i = 0; i < record.getColumnCount(); i++) {
+        String[] words = record.get(i).toString().split("\\s+");
+        for (String w : words) {
+          word.set(new Object[] {w});
+          Counter cnt = context.getCounter("MyCounters", "map_outputs");
+          cnt.increment(1);
+          gCnt.increment(1);
+          context.write(word, one);
+        }
+      }
+    }
+  }
+
+  /**
+   * A combiner class that combines map output by sum them.
+   */
+  public static class SumCombiner extends ReducerBase {
+    private Record count;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      count = context.createMapOutputValueRecord();
+    }
+
+    @Override
+    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
+      long c = 0;
+      while (values.hasNext()) {
+        Record val = values.next();
+        c += (Long) val.get(0);
+      }
+      count.set(0, c);
+      context.write(key, count);
+    }
+  }
+
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class SumReducer extends ReducerBase {
+    private Record result;
+    Counter gCnt;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      result = context.createOutputRecord();
+      gCnt = context.getCounter("MyCounters", "global_counts");
+    }
+
+    @Override
+    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
+      long count = 0;
+      while (values.hasNext()) {
+        Record val = values.next();
+        count += (Long) val.get(0);
+      }
+      result.set(0, key.get(0));
+      result.set(1, count);
+      Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
+      cnt.increment(1);
+      gCnt.increment(1);
+
+      context.write(result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    JobConf job = new JobConf();
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(SumCombiner.class);
+    job.setReducerClass(SumReducer.class);
+
+    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
+    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
+
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in1").cols(new String[] {"col2", "col3"})
+        .build(), job);
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
+
+    RunningJob rj = JobClient.runJob(job);
+    rj.waitForCompletion();
+  }
+
+}

+ 104 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/mr/test/WordCountTest.java

@@ -0,0 +1,104 @@
+package com.aliyun.odps.examples.mr.test;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.examples.mr.WordCount;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+import com.aliyun.odps.mapred.utils.SchemaUtils;
+import java.io.IOException;
+import java.util.List;
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class WordCountTest extends MRUnitTest {
+  // 定义输入输出表的 schema
+  private final static String INPUT_SCHEMA = "a:string,b:string";
+  private final static String OUTPUT_SCHEMA = "k:string,v:bigint";
+  private final JobConf job;
+
+  public WordCountTest() throws Exception {
+    TestUtil.initWarehouse();
+    // 准备作业配置
+    job = new JobConf();
+
+    job.setMapperClass(WordCount.TokenizerMapper.class);
+    job.setCombinerClass(WordCount.SumCombiner.class);
+    job.setReducerClass(WordCount.SumReducer.class);
+
+    job.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
+    job.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
+
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testMap() throws IOException, ClassNotFoundException, InterruptedException {
+    MapUTContext mapContext = new MapUTContext();
+    mapContext.setInputSchema(INPUT_SCHEMA);
+    mapContext.setOutputSchema(OUTPUT_SCHEMA, job);
+    // 准备测试数据
+    Record record = mapContext.createInputRecord();
+    record.set(new Text[] {new Text("hello"), new Text("c")});
+    mapContext.addInputRecord(record);
+
+    record = mapContext.createInputRecord();
+    record.set(new Text[] {new Text("hello"), new Text("java")});
+    mapContext.addInputRecord(record);
+    // 运行 map 过程
+    TaskOutput output = runMapper(job, mapContext);
+
+    // 验证 map 的结果(执行了combine),为 3 组 key/value 对
+    List<KeyValue<Record, Record>> kvs = output.getOutputKeyValues();
+    Assert.assertEquals(3, kvs.size());
+    Assert.assertEquals(new KeyValue<String, Long>(new String("c"), new Long(1)),
+        new KeyValue<String, Long>((String) (kvs.get(0).getKey().get(0)), (Long) (kvs.get(0)
+            .getValue().get(0))));
+    Assert.assertEquals(new KeyValue<String, Long>(new String("hello"), new Long(2)),
+        new KeyValue<String, Long>((String) (kvs.get(1).getKey().get(0)), (Long) (kvs.get(1)
+            .getValue().get(0))));
+    Assert.assertEquals(new KeyValue<String, Long>(new String("java"), new Long(1)),
+        new KeyValue<String, Long>((String) (kvs.get(2).getKey().get(0)), (Long) (kvs.get(2)
+            .getValue().get(0))));
+  }
+
+  @Test
+  public void testReduce() throws IOException, ClassNotFoundException, InterruptedException {
+    ReduceUTContext context = new ReduceUTContext();
+    context.setOutputSchema(OUTPUT_SCHEMA,  job);
+    // 准备测试数据
+    Record key = context.createInputKeyRecord(job);
+    Record value = context.createInputValueRecord(job);
+    key.set(0, "world");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "hello");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "hello");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "odps");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+
+    // 运行 reduce 过程
+    TaskOutput output = runReducer(job, context);
+
+    // 验证 reduce 结果,为 3 条 record
+    List<Record> records = output.getOutputRecords();
+    Assert.assertEquals(3, records.size());
+    Assert.assertEquals(new String("hello"), records.get(0).get("k"));
+    Assert.assertEquals(new Long(2), records.get(0).get("v"));
+    Assert.assertEquals(new String("odps"), records.get(1).get("k"));
+    Assert.assertEquals(new Long(1), records.get(1).get("v"));
+    Assert.assertEquals(new String("world"), records.get(2).get("k"));
+    Assert.assertEquals(new Long(1), records.get(2).get("v"));
+  }
+
+}

+ 47 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDAFExample.java

@@ -0,0 +1,47 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.udf.Aggregator;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb,cola
+ */
+@Resolve("string->bigint")
+public class UDAFExample extends Aggregator {
+
+  @Override
+  public void iterate(Writable buffer, Writable[] args) throws UDFException {
+    LongWritable result = (LongWritable) buffer;
+    for (Writable item : args) {
+      Text txt = (Text) item;
+      result.set(result.get() + txt.getLength());
+    }
+
+  }
+
+  @Override
+  public void merge(Writable buffer, Writable partial) throws UDFException {
+    LongWritable result = (LongWritable) buffer;
+    LongWritable partialResult = (LongWritable) partial;
+    result.set(result.get() + partialResult.get());
+
+  }
+
+  @Override
+  public Writable newBuffer() {
+    return new LongWritable(0L);
+  }
+
+  @Override
+  public Writable terminate(Writable buffer) throws UDFException {
+    return buffer;
+  }
+
+}

+ 93 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDAFResource.java

@@ -0,0 +1,93 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.udf.Aggregator;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb,cola
+ */
+@Resolve("string->bigint")
+public class UDAFResource extends Aggregator {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      fileResourceLineCount = 0;
+      String line;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  @Override
+  public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    for (Writable item : arg1) {
+      Text txt = (Text) item;
+      result.set(result.get() + txt.getLength());
+    }
+
+  }
+
+  @Override
+  public void merge(Writable arg0, Writable arg1) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    LongWritable partial = (LongWritable) arg1;
+    result.set(result.get() + partial.get());
+
+  }
+
+  @Override
+  public Writable newBuffer() {
+    return new LongWritable(0L);
+  }
+
+  @Override
+  public Writable terminate(Writable arg0) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    result.set(result.get() + fileResourceLineCount + tableResource1RecordCount
+        + tableResource2RecordCount);
+    return result;
+  }
+
+}

+ 35 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDFExample.java

@@ -0,0 +1,35 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.UDF;
+
+public class UDFExample extends UDF {
+
+  /**
+   * project: example_project
+   * table: wc_in1
+   * columns: col1
+   */
+  public String evaluate(String a) {
+    return "s2s:" + a;
+  }
+
+  /**
+   * project: example_project 
+   * table: wc_in1 
+   * columns: col1,col2
+   */
+  public String evaluate(String a, String b) {
+    return "ss2s:" + a + "," + b;
+  }
+
+  /**
+   * project: example_project 
+   * table: wc_in2 
+   * partitions: p2=1,p1=2 
+   * columns: colc,colb,cola
+   */
+  public String evaluate(String a, String b, String c) {
+    return "sss2s:" + a + "," + b + "," + c;
+  }
+
+}

+ 60 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDFResource.java

@@ -0,0 +1,60 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.udf.UDFException;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+public class UDFResource extends UDF {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      fileResourceLineCount = 0;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  /**
+   * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
+   */
+  public String evaluate(String a, String b) {
+    return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
+        + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
+        + tableResource2RecordCount;
+  }
+
+}

+ 24 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDTFExample.java

@@ -0,0 +1,24 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.UDTF;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb
+ */
+@Resolve({"string,string->string,bigint"})
+public class UDTFExample extends UDTF {
+
+  @Override
+  public void process(Object[] args) throws UDFException {
+    String a = (String) args[0];
+    long b = args[1] == null ? 0 : ((String) args[1]).length();
+
+    forward(a, b);
+
+  }
+}

+ 68 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/UDTFResource.java

@@ -0,0 +1,68 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.UDTF;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb
+ */
+@Resolve({"string,string->string,bigint,string"})
+public class UDTFResource extends UDTF {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      fileResourceLineCount = 0;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  @Override
+  public void process(Object[] args) throws UDFException {
+    String a = (String) args[0];
+    long b = args[1] == null ? 0 : ((String) args[1]).length();
+
+    forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
+        + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
+
+  }
+}

+ 63 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDAFTest.java

@@ -0,0 +1,63 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.AggregatorRunner;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDAFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new AggregatorRunner(null,
+        "com.aliyun.odps.examples.udf.UDAFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    Assert.assertEquals(24L, out.get(0)[0]);
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
+        "com.aliyun.odps.examples.udf.UDAFExample");
+    // partition table
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    Assert.assertEquals(36L, out.get(0)[0]);
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
+        "com.aliyun.odps.examples.udf.UDAFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    // 24+3+4+4
+    Assert.assertEquals(35L, out.get(0)[0]);
+  }
+
+}

+ 69 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDFTest.java

@@ -0,0 +1,69 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import com.aliyun.odps.udf.local.runner.UDFRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new UDFRunner(null, "com.aliyun.odps.examples.udf.UDFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:one,one", TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three,three", TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:four,four", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFExample");
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:one,one|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three,three|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:four,four|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(2)));
+  }
+
+}

+ 68 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udf/test/UDTFTest.java

@@ -0,0 +1,68 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import com.aliyun.odps.udf.local.runner.UDTFRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDTFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new UDTFRunner(null, "com.aliyun.odps.examples.udf.UDTFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("one,3", TestUtil.join(out.get(0)));
+    Assert.assertEquals("three,5", TestUtil.join(out.get(1)));
+    Assert.assertEquals("four,4", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFExample");
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(0)));
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(1)));
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3 + "", out.size() + "");
+    Assert.assertEquals("one,3,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(0)));
+    Assert.assertEquals("three,5,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(1)));
+    Assert.assertEquals("four,4,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(2)));
+  }
+
+}

+ 90 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.java

@@ -0,0 +1,90 @@
+package com.aliyun.odps.examples.udj;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.Yieldable;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDJ;
+import com.aliyun.odps.udf.annotation.Resolve;
+import java.util.ArrayList;
+import java.util.Iterator;
+/** For each record of right table, find the nearest record of left table and
+ * merge two records.
+ */
+@Resolve("->string,bigint,string")
+public class PayUserLogMergeJoin extends UDJ {
+  private Record outputRecord;
+  /** Will be called prior to the data processing phase. User could implement
+   * this method to do initialization work.
+   */
+  @Override
+  public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
+    //
+    outputRecord = new ArrayRecord(new Column[]{
+        new Column("user_id", OdpsType.STRING),
+        new Column("time", OdpsType.BIGINT),
+        new Column("content", OdpsType.STRING)
+    });
+  }
+
+  /** Override this method to implement join logic.
+   * @param key Current join key
+   * @param left Group of records of left table corresponding to the current key
+   * @param right Group of records of right table corresponding to the current key
+   * @param output Used to output the result of UDJ
+   */
+  @Override
+  public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
+    outputRecord.setString(0, key.getString(0));
+    if (!right.hasNext()) {
+      // Empty right group, do nothing.
+      return;
+    } else if (!left.hasNext()) {
+      // Empty left group. Output all records of right group without merge.
+      while (right.hasNext()) {
+        Record logRecord = right.next();
+        outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
+        outputRecord.setString(2, logRecord.getString(1));
+        output.yield(outputRecord);
+      }
+      return;
+    }
+    ArrayList<Record> pays = new ArrayList<>();
+    // The left group of records will be iterated from the start to the end
+    // for each record of right group, but the iterator cannot be reset.
+    // So we save every records of left to an ArrayList.
+    left.forEachRemaining(pay -> pays.add(pay.clone()));
+    while (right.hasNext()) {
+      Record log = right.next();
+      long logTime = log.getDatetime(0).getTime();
+      long minDelta = Long.MAX_VALUE;
+      Record nearestPay = null;
+      // Iterate through all records of left, and find the pay record that has
+      // the minimal difference in terms of time.
+      for (Record pay: pays) {
+        long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
+        if (delta < minDelta) {
+          minDelta = delta;
+          nearestPay = pay;
+        }
+      }
+      // Merge the log record with nearest pay record and output to the result.
+      outputRecord.setBigint(1, log.getDatetime(0).getTime());
+      outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
+      output.yield(outputRecord);
+    }
+  }
+
+  String mergeLog(String payInfo, String logContent) {
+    return logContent + ", pay " + payInfo;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+}

+ 240 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.java

@@ -0,0 +1,240 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.InputStreamSet;
+import com.aliyun.odps.io.SourceInputStream;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Extractor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class SpeechSentenceSnrExtractor extends Extractor {
+  private final static Log logger = LogFactory.getLog(SpeechSentenceSnrExtractor.class);
+
+  private static final String MLF_FILE_ATTRIBUTE_KEY = "mlfFileName";
+  private static final String SPEECH_SAMPLE_RATE_KEY = "speechSampleRateInKHz";
+
+  private String mlfFileName;
+  private HashMap<String, UtteranceLabel> utteranceLabels;
+  private InputStreamSet inputs;
+  private DataAttributes attributes;
+  private double sampleRateInKHz;
+
+  public SpeechSentenceSnrExtractor(){
+    this.utteranceLabels = new HashMap<String, UtteranceLabel>();
+  }
+
+  @Override
+  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
+    this.inputs = inputs;
+    this.attributes = attributes;
+    this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
+    if (this.mlfFileName == null){
+      throw new IllegalArgumentException("A mlf file must be specified in extractor attribute.");
+    }
+    String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
+    if (sampleRateInKHzStr == null){
+      throw new IllegalArgumentException("The speech sampling rate must be specified in extractor attribute.");
+    }
+    this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
+    try {
+      BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
+      loadMlfLabelsFromResource(inputStream);
+      inputStream.close();
+    } catch (IOException e) {
+      throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
+    }
+  }
+
+  @Override
+  public Record extract() throws IOException {
+    SourceInputStream inputStream = inputs.next();
+    if (inputStream == null){
+      return null;
+    }
+
+    String fileName = inputStream.getFileName();
+    fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
+    logger.info("Processing wav file " + fileName);
+    // full file path: path/to/XXX.wav => XXX as id
+    String id = fileName.substring(0, fileName.lastIndexOf('.'));
+
+    long fileSize = inputStream.getFileSize();
+    if (fileSize > Integer.MAX_VALUE){
+      // technically a larger file can be read via multiple batches,
+      // but we simply do not support it in this example.
+      throw new IllegalArgumentException("Do not support speech file larger than 2G bytes");
+    }
+    byte[] buffer = new byte[(int)fileSize];
+
+    Column[] outputColumns = this.attributes.getRecordColumns();
+    ArrayRecord record = new ArrayRecord(outputColumns);
+    if (outputColumns.length != 2 || outputColumns[0].getType() != OdpsType.DOUBLE
+        || outputColumns[1].getType() != OdpsType.STRING){
+      throw new IllegalArgumentException("Expecting output to of schema double|string.");
+    }
+    int readSize = inputStream.readToEnd(buffer);
+    inputStream.close();
+    double snr = computeSnr(id, buffer, readSize);
+    record.setDouble(0, snr);
+    record.setString(1, id);
+    logger.info(String.format("file [%s] snr computed to be [%f]db", fileName, snr));
+    return record;
+  }
+
+  @Override
+  public void close(){
+    //no-op
+  }
+
+  private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
+      throws IOException {
+    BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream));
+    String line;
+    String id = "";
+    // here we relies on the particular format of the mlf to load labels from the file
+    while ((line = br.readLine()) != null) {
+      if (line.trim().isEmpty()){
+        continue;
+      }
+      if (line.startsWith("id:")){
+        id = line.split(":")[1].trim();
+      }
+      else{
+        // in this branch, line must be the label
+        this.utteranceLabels.put(id, new UtteranceLabel(id, line, " "));
+      }
+    }
+  }
+
+  // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
+  private double computeSnr(String id, byte[] buffer, int validBufferLen){
+    final int headerLength = 44;
+    if (validBufferLen < headerLength){
+      throw new IllegalArgumentException("A wav buffer must be at least larger than standard wav header size.");
+    }
+    // each frame is 10 ms
+    int sampleCountPerFrame = (int)this.sampleRateInKHz * 10;
+    // each data point denoted by a short integer (2 bytes)
+    int dataLen = (validBufferLen - headerLength) / 2;
+
+    if (dataLen % sampleCountPerFrame != 0){
+      throw new IllegalArgumentException(
+          String.format("Invalid wav file where dataLen %d does not divide sampleCountPerFrame %d",
+              dataLen, sampleCountPerFrame));
+    }
+    // total number of frames in the wav file
+    int frameCount = dataLen / sampleCountPerFrame;
+
+    UtteranceLabel utteranceLabel = this.utteranceLabels.get(id);
+    if (utteranceLabel == null){
+      throw new IllegalArgumentException(String.format("Cannot find label of id %s from MLF.", id));
+    }
+    ArrayList<Long> labels = utteranceLabel.getLabels();
+    // usually frameCount should be larger than labels.size() by a small margin
+    // in our sample data, this margin is 2.
+    if (labels.size()  + 2 != frameCount){
+      throw new IllegalArgumentException(String.format("Mismatched frame labels size % d and frameCount %d.",
+          labels.size() + 2, frameCount ));
+    }
+    int offset = headerLength;
+    short data[] = new short[sampleCountPerFrame];
+    double energies[] = new double[frameCount];
+    for (int i = 0; i < frameCount; i++ ){
+      ByteBuffer.wrap(buffer, offset, sampleCountPerFrame * 2)
+          .order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(data);
+      double frameEnergy = 0;
+      for (int j = 0; j < sampleCountPerFrame; j++){
+        frameEnergy += data[j] * data[j];
+      }
+      energies[i] = frameEnergy;
+      offset += sampleCountPerFrame * 2;
+    }
+
+    double averageSpeechPower = 0;
+    double averageNoisePower  = 0.00000001;
+    int speechframeCount = 0;
+    int noiseframeCount = 0;
+
+    for (int i = 0; i < labels.size(); i++){
+      if (labels.get(i) == 0){
+        averageNoisePower += energies[i];
+        noiseframeCount++;
+      } else {
+        averageSpeechPower += energies[i];
+        speechframeCount++;
+      }
+    }
+
+    if (noiseframeCount > 0){
+      averageNoisePower /= noiseframeCount;
+    } else {
+      // no noise, pure speech snr = max of 100db
+      return 100;
+    }
+
+    if (speechframeCount > 0) {
+      averageSpeechPower /= speechframeCount;
+    } else {
+      // no speech, pure noise, snr = min  of -100db
+      return -100;
+    }
+
+    return 10 * Math.log10(averageSpeechPower/averageNoisePower);
+  }
+}
+
+
+class UtteranceLabel {
+  private String id; // id is the same as file name
+  private ArrayList<Long> labels;
+  private long labelIndex;
+  private long frameCount;
+
+  public String getId(){
+    return id;
+  }
+
+  public ArrayList<Long> getLabels(){
+    return this.labels;
+  }
+
+  UtteranceLabel(String id, String labelString, String labelDelimiter){
+    // note: no error checking here
+    this.labels = new ArrayList<Long>();
+    this.id = id;
+    final String[] splits = labelString.split(labelDelimiter);
+    if (splits.length < 2){
+      throw new InvalidParameterException("Invalid label line: at least index and length should be provided.");
+    }
+    this.labelIndex = Long.parseLong(splits[0]);
+    this.frameCount = Long.parseLong(splits[1]);
+    if (splits.length != frameCount + 2){
+      throw new InvalidParameterException("Label length mismatches label header meta.");
+    }
+    for (int i = 2; i < splits.length; i++){
+      long label = Long.parseLong(splits[i]);
+      // normalize vector entry to denote voice/non-voice, we only need this for snr computation
+      if (label >= 2057 && label <= 2059){
+        label = 0;
+      } else {
+        label = 1;
+      }
+      labels.add(label);
+    }
+  }
+}

+ 18 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.java

@@ -0,0 +1,18 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.udf.Extractor;
+import com.aliyun.odps.udf.OdpsStorageHandler;
+import com.aliyun.odps.udf.Outputer;
+
+public class SpeechStorageHandler extends OdpsStorageHandler {
+
+  @Override
+  public Class<? extends Extractor> getExtractorClass() {
+    return SpeechSentenceSnrExtractor.class;
+  }
+
+  @Override
+  public Class<? extends Outputer> getOutputerClass() {
+    throw new UnsupportedOperationException();
+  }
+}

+ 140 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextExtractor.java

@@ -0,0 +1,140 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.InputStreamSet;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Extractor;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
+ **/
+public class TextExtractor extends Extractor {
+
+  private InputStreamSet inputs;
+  private String columnDelimiter;
+  private DataAttributes attributes;
+  private BufferedReader currentReader;
+  private boolean firstRead = true;
+
+  public TextExtractor() {
+    // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
+    this.columnDelimiter = ",";
+  }
+
+  // no particular usage for execution context in this example
+  @Override
+  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
+    this.inputs = inputs;
+    this.attributes = attributes;
+    // check if "delimiter" attribute is supplied via SQL query
+    String columnDelimiter = this.attributes.getValueByKey("delimiter");
+    if ( columnDelimiter != null)
+    {
+      this.columnDelimiter = columnDelimiter;
+    }
+    System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
+    // note: more properties can be inited from attributes if needed
+  }
+
+  @Override
+  public Record extract() throws IOException {
+    String line = readNextLine();
+    if (line == null) {
+      return null;
+    }
+    return textLineToRecord(line);
+  }
+
+  @Override
+  public void close(){
+    // no-op
+  }
+
+  private Record textLineToRecord(String line) throws IllegalArgumentException
+  {
+    Column[] outputColumns = this.attributes.getRecordColumns();
+    ArrayRecord record = new ArrayRecord(outputColumns);
+    if (this.attributes.getRecordColumns().length != 0){
+      // string copies are needed, not the most efficient one, but suffice as an example here
+      String[] parts = line.split(columnDelimiter);
+      int[] outputIndexes = this.attributes.getNeededIndexes();
+      if (outputIndexes == null){
+        throw new IllegalArgumentException("No outputIndexes supplied.");
+      }
+      if (outputIndexes.length != outputColumns.length){
+        throw new IllegalArgumentException("Mismatched output schema: Expecting "
+            + outputColumns.length + " columns but get " + parts.length);
+      }
+      int index = 0;
+      for(int i = 0; i < parts.length; i++){
+        // only parse data in columns indexed by output indexes
+        if (index < outputIndexes.length && i == outputIndexes[index]){
+          switch (outputColumns[index].getType()) {
+            case STRING:
+              record.setString(index, parts[i]);
+              break;
+            case BIGINT:
+              record.setBigint(index, Long.parseLong(parts[i]));
+              break;
+            case BOOLEAN:
+              record.setBoolean(index, Boolean.parseBoolean(parts[i]));
+              break;
+            case DOUBLE:
+              record.setDouble(index, Double.parseDouble(parts[i]));
+              break;
+            case DATETIME:
+            case DECIMAL:
+            case ARRAY:
+            case MAP:
+            default:
+              throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
+          }
+          index++;
+        }
+      }
+    }
+    return record;
+  }
+
+  /**
+   * Read next line from underlying input streams.
+   * @return The next line as String object. If all of the contents of input
+   * streams has been read, return null.
+   */
+  private String readNextLine() throws IOException {
+    if (firstRead) {
+      firstRead = false;
+      // the first read, initialize things
+      currentReader = moveToNextStream();
+      if (currentReader == null) {
+        // empty input stream set
+        return null;
+      }
+    }
+    while (currentReader != null) {
+      String line = currentReader.readLine();
+      if (line != null) {
+        return line;
+      }
+      currentReader = moveToNextStream();
+    }
+    return null;
+  }
+
+  private BufferedReader moveToNextStream() throws IOException {
+    InputStream stream = inputs.next();
+    if (stream == null) {
+      return null;
+    } else {
+      return new BufferedReader(new InputStreamReader(stream));
+    }
+  }
+}

+ 56 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextOutputer.java

@@ -0,0 +1,56 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.OutputStreamSet;
+import com.aliyun.odps.io.SinkOutputStream;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Outputer;
+
+import java.io.IOException;
+
+public class TextOutputer extends Outputer {
+  private SinkOutputStream outputStream;
+  private DataAttributes attributes;
+  private String delimiter;
+
+  public TextOutputer (){
+    // default delimiter, this can be overwritten if a delimiter is provided through the attributes.
+    this.delimiter = "|";
+  }
+
+  @Override
+  public void output(Record record) throws IOException {
+    this.outputStream.write(recordToString(record).getBytes());
+  }
+
+  // no particular usage of execution context in this example
+  @Override
+  public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
+    this.outputStream = outputStreamSet.next();
+    this.attributes = attributes;
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  private String recordToString(Record record){
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < record.getColumnCount(); i++)
+    {
+      if (null == record.get(i)){
+        sb.append("NULL");
+      }
+      else{
+        sb.append(record.get(i).toString());
+      }
+      if (i != record.getColumnCount() - 1){
+        sb.append(this.delimiter);
+      }
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+}

+ 18 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/TextStorageHandler.java

@@ -0,0 +1,18 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.udf.Extractor;
+import com.aliyun.odps.udf.OdpsStorageHandler;
+import com.aliyun.odps.udf.Outputer;
+
+public class TextStorageHandler extends OdpsStorageHandler {
+
+  @Override
+  public Class<? extends Extractor> getExtractorClass() {
+    return TextExtractor.class;
+  }
+
+  @Override
+  public Class<? extends Outputer> getOutputerClass() {
+    return TextOutputer.class;
+  }
+}

+ 111 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/test/ExtractorTest.java

@@ -0,0 +1,111 @@
+package com.aliyun.odps.examples.unstructured.test;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.examples.unstructured.SpeechSentenceSnrExtractor;
+import com.aliyun.odps.examples.unstructured.TextExtractor;
+import com.aliyun.odps.udf.local.runner.ExtractorRunner;
+import com.aliyun.odps.udf.local.util.LocalDataAttributes;
+import com.aliyun.odps.udf.local.util.UnstructuredUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExtractorTest {
+  private String ambulanceFullSchema =
+      "vehicle:bigint;id:bigint;patient:bigint;calls:bigint;latitude:double;longitude:double;time:string;direction:string";
+  private String speechDataFullSchema = "sentence_snr:double;id:string";
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void testTextExtractor() throws Exception {
+    /**
+     * Equivalent to the following SQL:
+     CREATE EXTERNAL TABLE  ambulance_data_external
+     ( vehicle bigint, id bigint, patient bigint, calls bigint,
+     Latitude double, Longitude double, time string, direction string)
+     STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
+     LOCATION 'oss://.../data/ambulance_csv/'
+     USING 'jar_file_name.jar';
+
+     SELECT * FROM ambulance_data_external;
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(ambulanceFullSchema);
+    // note: default delimiter used in TextExtractor is ','
+    LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
+    ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new TextExtractor(), attributes);
+    //using local file directory to mock data source
+    runner.feedDirectory(TestUtil.class.getResource("/data/ambulance_csv/").getPath());
+    List<Record> records = runner.yieldRecords();
+    // do verification below
+    Assert.assertEquals(records.size(), 15);
+    ArrayRecord record0 = new ArrayRecord(externalTableSchema);
+    record0.set(0, (long)1);
+    record0.set(1, (long)1);
+    record0.set(2, (long)51);
+    record0.set(3, (long)1);
+    record0.set(4, 46.81006);
+    record0.set(5, -92.08174);
+    record0.set(6, "9/14/2014 0:00");
+    record0.set(7, "S");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
+  }
+
+  @Test
+  public void testSpeechExtraction() throws Exception {
+    /**
+     * Equivalent to the following SQL:
+     CREATE EXTERNAL TABLE speech_snr_external
+     (sentence_snr double, id string)
+     STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
+     WITH SERDEPROPERTIES ('mlfFileName'='speech_model_random_5_utterance' , 'speechSampleRateInKHz' = '16')
+     LOCATION 'oss://.../data/speech_wav/'
+     USING 'jar_file_name.jar';
+
+     SELECT * FROM speech_snr_external;
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(speechDataFullSchema);
+    Map<String, String> userProperties = new HashMap<String, String>();
+    // a file resource
+    userProperties.put("mlfFileName", "speech_model_random_5_utterance");
+    // an extractor parameter
+    userProperties.put("speechSampleRateInKHz", "16");
+    LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
+    // SpeechSentenceSnrExtractor will analyze a speech wav file and output
+    // 1. the average sentence snr of a wav file
+    // 2. the corresponding wav file name
+    ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new SpeechSentenceSnrExtractor(), attributes);
+
+    runner.feedDirectory(TestUtil.class.getResource("/data/speech_wav/").getPath());
+    List<Record> records = runner.yieldRecords();
+
+    // do verification below
+    Assert.assertEquals(records.size(), 3);
+
+    ArrayRecord record0 = new ArrayRecord(externalTableSchema);
+    record0.set(0, 31.39050062838079);
+    record0.set(1, "tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
+
+    ArrayRecord record1 = new ArrayRecord(externalTableSchema);
+    record1.set(0, 35.477360745366035);
+    record1.set(1, "tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record1, records.get(1)));
+
+    ArrayRecord record2 = new ArrayRecord(externalTableSchema);
+    record2.set(0, 16.046150955268665);
+    record2.set(1, "tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record2, records.get(2)));
+  }
+
+}

+ 133 - 0
MyFirstUDF/examples/com/aliyun/odps/examples/unstructured/test/OutputerTest.java

@@ -0,0 +1,133 @@
+package com.aliyun.odps.examples.unstructured.test;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.example.text.TextOutputer;
+import com.aliyun.odps.udf.local.runner.OutputerRunner;
+import com.aliyun.odps.udf.local.util.LocalDataAttributes;
+import com.aliyun.odps.udf.local.util.UnstructuredUtils;
+import com.aliyun.odps.utils.StringUtils;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OutputerTest {
+
+  private String simpleTableSchema = "a:bigint;b:double;c:string";
+  private String adsLogTableSchema = "AdId:BIGINT;Rand:DOUBLE;AdvertiserName:STRING;Comment:STRING";
+  private File outputDirectory = null;
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Before
+  public void before() throws IOException{
+    // output directory preparation
+    outputDirectory = new File("temp/" + UnstructuredUtils.generateOutputName());
+    outputDirectory.delete();
+    outputDirectory.mkdirs();
+  }
+
+  @Test
+  public void testOutputSimpleText() throws Exception {
+    /**
+     * Test outputting manually constructed records to text
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(simpleTableSchema);
+    LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
+    // TextOutputer will output one single file
+    OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
+    List<Record> records = new ArrayList<Record>();
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1, 2.5, "row0"}));
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1234567, 8.88, "row1"}));
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)12, 123.1, "testrow"}));
+    // run outputer
+    runner.feedRecords(records);
+    runner.yieldTo(outputDirectory.getAbsolutePath());
+
+    String expcetedOutput = "1|2.5|row0\n" +
+        "1234567|8.88|row1\n" +
+        "12|123.1|testrow\n";
+
+    verifySingleFileOutput(expcetedOutput);
+  }
+
+  @Test
+  public void testOutputSpecialText() throws Exception {
+    /**
+     * Test reading from internal table and outputting to text file, with a user defined delimiter.
+     * Equivalent to the following SQL:
+     *
+     CREATE EXTERNAL TABLE ads_log_external
+     (AdId bigint, Rand double,
+     AdvertiserName string, Comment string)
+     STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
+     WITH SERDEPROPERTIES ('delimiter'='\t')
+     LOCATION 'oss://path/to/output/'
+     USING 'jar_file_name.jar';;
+
+     INSERT OVERWRITE ads_log_external SELECT * FROM ads_log;
+     * Here ads_log is an internal table (locally defined in warehouse directory)
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(adsLogTableSchema);
+    Map<String, String> userProperties = new HashMap<String, String>();
+    userProperties.put("delimiter", "\t");
+    LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
+    // TextOutputer outputs one single file
+    OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
+    String internalTableName = "ads_log";
+    // We are doing SELECT * FROM here, so the two tables have the same schema
+    Column[] internalTableSceham = externalTableSchema;
+
+    List<Record> records = new ArrayList<Record>();
+    Record record;
+    while ((record = UnstructuredUtils.readFromInternalTable("example_project", internalTableName,
+        internalTableSceham, null)) != null){
+      records.add(record.clone());
+    }
+    // run outputer
+    runner.feedRecords(records);
+    runner.yieldTo(outputDirectory.getAbsolutePath());
+
+    String expcetedOutput = "399266\t0.5\tDoritos\twhat is up\n" +
+        "399266\t0.0\tTacobell\thello!\n" +
+        "382045\t-76.0\tVoelkl\trandom comments\n" +
+        "382045\t6.4\tWhistler Resort\ta\n" +
+        "106479\t98.7\tAmazon Prime\tbdcd\n" +
+        "906441\t-9865788.2\tHayden Planetarium\tplatium\n" +
+        "351530\t0.005\tMicrosoft Azure Services\ttst\n";
+
+    verifySingleFileOutput(expcetedOutput);
+  }
+
+  private void verifySingleFileOutput(String expectedOutput) throws IOException {
+    verifyFilesOutput(new String[]{expectedOutput});
+  }
+
+  private void verifyFilesOutput(String[] expectedOutputs) throws IOException {
+    File[] outputs = outputDirectory.listFiles();
+    Assert.assertEquals(outputs.length, expectedOutputs.length);
+    for (int i = 0; i < outputs.length; i++){
+      File outputFile = outputs[i];
+      FileInputStream fis = new FileInputStream(outputFile);
+      byte[] data = new byte[(int)outputFile.length()];
+      fis.read(data);
+      String content = new String(data);
+      String[] rows = StringUtils.split(content, '\n');
+      String[] expectedRows = StringUtils.split(expectedOutputs[i], '\n');
+      // due to double presentation accuracy difference, the output may not exactly match expected,
+      // therefore we only verify that numbers of rows match.
+      Assert.assertEquals(rows.length, expectedRows.length);
+    }
+  }
+}

+ 6 - 0
MyFirstUDF/examples/data/ambulance_csv/1.csv

@@ -0,0 +1,6 @@
+1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
+1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S

+ 9 - 0
MyFirstUDF/examples/data/ambulance_csv/2.csv

@@ -0,0 +1,9 @@
+1,1,40,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,2,33,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,3,60,1,46.81006,-92.08174,9/15/2014 0:00,NW
+1,4,50,1,46.81006,-92.08174,9/15/2014 0:00,SW
+1,5,50,1,46.81006,-92.08174,9/15/2014 0:00,S
+1,6,53,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,7,60,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,8,75,1,46.81006,-92.08174,9/15/2014 0:00,E
+1,9,75,1,46.81006,-92.08174,9/15/2014 0:00,E

BIN
MyFirstUDF/examples/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav


BIN
MyFirstUDF/examples/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav


BIN
MyFirstUDF/examples/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav