Resource.java 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.aliyun.odps.examples.mr;
  2. import com.aliyun.odps.data.Record;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.mapred.JobClient;
  5. import com.aliyun.odps.mapred.MapperBase;
  6. import com.aliyun.odps.mapred.conf.JobConf;
  7. import com.aliyun.odps.mapred.utils.InputUtils;
  8. import com.aliyun.odps.mapred.utils.OutputUtils;
  9. import java.io.BufferedReader;
  10. import java.io.IOException;
  11. import java.io.InputStream;
  12. import java.io.InputStreamReader;
  13. import java.util.Iterator;
  14. /*
  15. * 该示例展示了如何在MapReduce程序中读取文件资源
  16. * 该示例主要用于演示Local模式下的调试,如果要将该示例运行于在线环境,
  17. * 请将 main方法中的语句 "job.setResources("file_resource.txt");" 删除
  18. *
  19. * Usage:
  20. * Set Resource arguments:
  21. * file_resource.txt
  22. * Set program arguments:
  23. * wc_in1 rs_out
  24. */
  25. public class Resource {
  26. public static class TokenizerMapper extends MapperBase {
  27. Record result;
  28. @Override
  29. public void setup(TaskContext context) throws IOException {
  30. result = context.createOutputRecord();
  31. long fileResourceLineCount = 0;
  32. InputStream in = context.readResourceFileAsStream("file_resource.txt");
  33. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  34. String line;
  35. while ((line = br.readLine()) != null) {
  36. fileResourceLineCount++;
  37. }
  38. br.close();
  39. result.set(0, "file_resource_line_count");
  40. result.set(1, fileResourceLineCount);
  41. context.write(result);
  42. br.close();
  43. Iterator<Record> it = context.readResourceTable("table_resource1");
  44. long tableResourceRecordCount = 0;
  45. while (it.hasNext()) {
  46. Record r = it.next();
  47. ++tableResourceRecordCount;
  48. }
  49. result.set(0, "table_resource1_record_count");
  50. result.set(1, tableResourceRecordCount);
  51. context.write(result);
  52. it = context.readResourceTable("table_resource2");
  53. tableResourceRecordCount = 0;
  54. while (it.hasNext()) {
  55. Record r = it.next();
  56. ++tableResourceRecordCount;
  57. }
  58. result.set(0, "table_resource2_record_count");
  59. result.set(1, tableResourceRecordCount);
  60. context.write(result);
  61. }
  62. }
  63. public static void main(String[] args) throws Exception {
  64. JobConf job = new JobConf();
  65. job.setMapperClass(TokenizerMapper.class);
  66. job.setNumReduceTasks(0);
  67. InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
  68. OutputUtils.addTable(TableInfo.builder().tableName("rs_out").build(), job);
  69. JobClient.runJob(job);
  70. }
  71. }