123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package com.aliyun.odps.examples.mr;
- import com.aliyun.odps.data.Record;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.mapred.JobClient;
- import com.aliyun.odps.mapred.MapperBase;
- import com.aliyun.odps.mapred.conf.JobConf;
- import com.aliyun.odps.mapred.utils.InputUtils;
- import com.aliyun.odps.mapred.utils.OutputUtils;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.util.Iterator;
- public class Resource {
- public static class TokenizerMapper extends MapperBase {
- Record result;
- @Override
- public void setup(TaskContext context) throws IOException {
- result = context.createOutputRecord();
- long fileResourceLineCount = 0;
- InputStream in = context.readResourceFileAsStream("file_resource.txt");
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- String line;
- while ((line = br.readLine()) != null) {
- fileResourceLineCount++;
- }
- br.close();
- result.set(0, "file_resource_line_count");
- result.set(1, fileResourceLineCount);
- context.write(result);
- br.close();
- Iterator<Record> it = context.readResourceTable("table_resource1");
- long tableResourceRecordCount = 0;
- while (it.hasNext()) {
- Record r = it.next();
- ++tableResourceRecordCount;
- }
- result.set(0, "table_resource1_record_count");
- result.set(1, tableResourceRecordCount);
- context.write(result);
- it = context.readResourceTable("table_resource2");
- tableResourceRecordCount = 0;
- while (it.hasNext()) {
- Record r = it.next();
- ++tableResourceRecordCount;
- }
- result.set(0, "table_resource2_record_count");
- result.set(1, tableResourceRecordCount);
- context.write(result);
- }
- }
- public static void main(String[] args) throws Exception {
- JobConf job = new JobConf();
- job.setMapperClass(TokenizerMapper.class);
- job.setNumReduceTasks(0);
- InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
- OutputUtils.addTable(TableInfo.builder().tableName("rs_out").build(), job);
- JobClient.runJob(job);
- }
- }
|