转载

5.hadoop流原理、实例和新旧API下Wordcount详解

前四篇文章讲了Hadoop的配置和测试以及eclipse下的使用,有兴趣的可以先看下。

1.Hadoop流简介

用可执行文件作为Mapper和Reducer,接受的都是标准输入,输出的都是标准输出。

当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可 执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化 为<key, value>对,作为Map的输出。

Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将<key, value>对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并 把每一行转化为<key, value>对,作为Reduce的输出。

Map与Reduce将输出转化为<key , value>对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一 行的所有内容会作为key,而value值为null。当然这是可以更改的。

值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。

下面先看一个简单例子,用/bin/cat作Mapper,用/usr/bin/wc作Reducer

/input下两个文件为:

hello world bye world

hello hadoop bye hadoop

我在root账户/usr/local/hadoop/hadoop-2.2.0/bin目录(和安装路径有关)下运行此代码,可以统计文件中的行数,单词数和字节数。

root@master:/usr/local/hadoop/hadoop-2.2.0/bin# hadoop jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output/stream -mapper /bin/cat -reducer /usr/bin/wc

运行结果为: 2 8 46

wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。

2.1 Hadoop流命令

参数        可选/必选 参数      可选/必选

-input 必选  -cmdenv   可选

-output  必选  -inputreader    可选

-mapper  必选  -verbose    可选

-reducer 必选  -lazyOutput     可选

-file   可选  -numReduce tasks 可选

-inputformat 可选  -mapdebug    可选

-outputformat 可选  -reducedebug   可选

-partitioner    可选  -io     可选

-combiner  可选 

Hadoop流命令中,必选的4个很好理解,分别用于指定输入/输出文件的位置及Map/Reduce函数。在其他的可选命令中,这里我们只解释常用的几个。

-file

这个指令用于将文件加入到Hadoop的Job中。上面的例子中,cat和wc都是 Linux 系统中的命令,而在Hadoop流的使用中,往往需要使用自己写的文件(作为Map函数或Reduce函数)。一般而言,这些文件是Hadoop集群中的机器上没有的,这时就需要使用Hadoop流中的-file命令将这个可执行文件加入到Hadoop的Job中。

-combiner

这个命令用来加入combiner程序。

-inputformat和-outputformat

这两个命令用来设置输入输出文件的处理方法,这两个命令后面的参数必须是Java类。

2.2  Hadoop流通用的命令选项

Hadoop流的通用命令用来配置Hadoop流的Job。需要注意的是,如果使用这部分配置,就必须将其置于流命令配置之前,否则命令会失败。这里简要列出命令列表,供大家参考。

Hadoop流的Job设置命令

参数   可选/必选  参数   可选/必选

-conf  可选  -files       可选

-D      可选  -libjars    可选

-fs     可选  -archives  可选

-jt      可选

从上面的内容可以知道,Hadoop流的API是一个扩展性非常强的框架,它与程序相连的部分只有数据,因此可以接受任何适用于UNIX标准输入/输出的脚本语言,比如Bash、PHP、Ruby、Python等。下面举两个非常简单的例子来进一步说明它的特性。(来源:《Hadoop实战》-陆嘉恒,中国人民大学)

3. 1 Bash

MapReduce框架是一个非常适合在大规模的非结构化数据中查找数据的 编程 模型,grep就是这种类型的一个例子。

在Linux中,grep命令用来在一个或多个文件中查找某个字符模式(这个字符模式可以代表字符串,多用正则表达式表示)。

下面尝试在如下的数据中查找带有Hadoop字符串的行,如下所示。

输入文件为:

file01:

hello world bye world

file02:

hello hadoop bye hadoop

reduce文件为:

reduce.sh:

grep hadoop

输入命令为:

root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output -mapper /bin/cat -reducer ~/Desktop/test/reducer.sh -file ~/Desktop/test/reducer.sh

结果为:hello      hadoop bye hadoop

显然,这个结果是正确的。

3.2 Python

对于 Python 来说,情况有些特殊。因为Python是可以编译为JAR包的,如果将程序编译为JAR包,那么就可以采用运行JAR包的方式来运行了。

不过,同样也可以用流的方式运行Python程序。请看如下代码:Reduce.py

 1  1 #!/usr/bin/python  2  2   3  3 import sys;  4  4   5  5 def generateLongCountToken(id):  6  6     return "LongValueSum:" + id + "/t" + "1"  7  7 def main(argv):  8  8     line = sys.stdin.readline();  9  9     try: 10 10         while line: 11 11             line = line[:-1]; 12 12             fields = line.split("/t"); 13 13             print generateLongCountToken(fields[0]); 14 14             line = sys.stdin.readline(); 15 15     except "end of file": 16 16         return None 17 17 if __name__ == "__main__": 18 18      main(sys.argv)

使用如下命令来运行:

root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /pyoutput -mapper reduce.py -reducer aggregate -file reduce.py

注意其中的aggregate是Hadoop提供的一个包,它提供一个Reduce函数和一个combine函数。这个函数实现一些简单的类似求和、取最大值最小值等的功能。

4.Hadoop Pipes

Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:

 1 #include "hadoop/Pipes.hh"  2 #include "hadoop/TemplateFactory.hh"  3 #include "hadoop/StringUtils.hh"  4   5 const std::string WORDCOUNT = "WORDCOUNT";  6 const std::string INPUT_WORDS = "INPUT_WORDS";  7 const std::string OUTPUT_WORDS = "OUTPUT_WORDS";  8   9 class WordCountMap: public HadoopPipes::Mapper { 10 public: 11   HadoopPipes::TaskContext::Counter* inputWords; 12   13   WordCountMap(HadoopPipes::TaskContext& context) { 14     inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS); 15   } 16   17   void map(HadoopPipes::MapContext& context) { 18     std::vector<std::string> words = 19       HadoopUtils::splitString(context.getInputValue(), " "); 20     for(unsigned int i=0; i < words.size(); ++i) { 21       context.emit(words[i], "1"); 22     } 23     context.incrementCounter(inputWords, words.size()); 24   } 25 }; 26  27 class WordCountReduce: public HadoopPipes::Reducer { 28 public: 29   HadoopPipes::TaskContext::Counter* outputWords; 30  31 WordCountReduce(HadoopPipes::TaskContext& context) { 32     outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS); 33   } 34  35 void reduce(HadoopPipes::ReduceContext& context) { 36     int sum = 0; 37     while (context.nextValue()) { 38       sum += HadoopUtils::toInt(context.getInputValue()); 39     } 40     context.emit(context.getInputKey(), HadoopUtils::toString(sum)); 41     context.incrementCounter(outputWords, 1); 42   } 43 }; 44 int main(int argc, char *argv[]) { 45   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>()); 46 }

这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集<key, value>对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输 出<key, value>对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个 TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、 record reader、record writer。

接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:

apt-get install g++

然后建立文件Makerfile,如下所示:

HADOOP_INSTALL="你的hadoop安装文件夹"

PLATFORM= Linux -i386-32(如果是AMD的CPU,请使用 Linux -amd64-64)

CC = g++CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

wordcount: wordcount.cpp$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@

注意在$(CC)前有一个<tab>符号,这个分隔符是很关键的。

在当前目录下建立一个WordCount可执行文件。

接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。

~/hadoop/bin/hadoop fs –mkdir bin ~/hadoop/bin/hadoop dfs –put wordcount /bin

然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:

~/hadoop/bin/hadoop pipes/ -D hadoop.pipes.java.recordreader=true/ -D hadoop.pipes.java.recordwriter=true/ -input input/ -output Coutput/ -program /bin/wordcount

另一种方式是预先将配置写入配置文件中,如下所示:

<?xml version="1.0"?> <configuration>   <property>     // Set the binary path on DFS     <name>hadoop.pipes.executable</name>     <value>bin/wordcount</value>   </property>   <property>     <name>hadoop.pipes.java.recordreader</name>     <value>true</value>   </property>   <property>     <name>hadoop.pipes.java.recordwriter</name>     <value>true</value>   </property> </configuration>

然后通过如下命令运行这个程序:

~/hadoop/bin/hadoop pipes -conf word.xml -input /input -output /output

将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用 Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函 数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。

5.1 旧API  WordCount分析

1)源代码程序

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

public static class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

}

3)主方法 Main 分析

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);}

首先讲解一下Job的初始化过程。main函数调用Jobconf类来对MapReduce Job进行初始化,然后调用setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,以便在JobTracker和Tasktracker的页面中对其进行监视。

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

接着设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数>,所以 key设置为"Text"类型,相当于Java中String类型。Value设置为"IntWritable",相当于Java中的int类型。

conf.setOutputKeyClass(Text.class );

conf.setOutputValueClass(IntWritable.class );

然后设置Job处理的Map(拆分)、Combiner(中间结果合并)以及Reduce(合并)的相关处理类。这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。

conf.setMapperClass(Map.class );

conf.setCombinerClass(Reduce.class );

conf.setReducerClass(Reduce.class );

接着就是调用setInputPath()和setOutputPath()设置输入输出路径

conf.setInputFormat(TextInputFormat.class );

conf.setOutputFormat(TextOutputFormat.class );

(1)InputFormat和InputSplit

InputSplit是Hadoop定义的用来传送给每个单独的map的数据,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过InputFormat()来设置。

当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的<key,value>对。简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的<key,value>对,它们都继承自InputFormat,分别是:

InputFormat

|

|---BaileyBorweinPlouffe.BbpInputFormat

|---ComposableInputFormat

|---CompositeInputFormat

|---DBInputFormat

|---DistSum.Machine.AbstractInputFormat

|---FileInputFormat

|---CombineFileInputFormat

|---KeyValueTextInputFormat

|---NLineInputFormat

|---SequenceFileInputFormat

|---TeraInputFormat

|---TextInputFormat

其中TextInputFormat是Hadoop默认的输入方法,在TextInputFormat中,每个文件(或其一部分)都会单独地作为map的输入,而这个是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示成<key,value>形式:

  • key值是每个数据的记录在数据分片中字节偏移量,数据类型是LongWritable;

value值是每行的内容,数据类型是Text。

(2)OutputFormat

每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String类型再输出。

3)Map类中map方法分析

public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

Map类继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个规范类型,它有4种形式的参数,分别用来指定map的输入key值类型、输入value值类型、输出key值类型和输出value 值类型。在本例中,因为使用的是TextInputFormat,它的输出key值是LongWritable类型,输出value值是Text类型,所 以map的输入类型为<LongWritable,Text>。在本例中需要输出<word,1>这样的形式,因此输出的key 值类型是Text,输出的value值类型是IntWritable。

实现此接口类还需要实现map方法,map方法会具体负责对输入进行操作,在本例中,map方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<word,1>。

4)Reduce类中reduce方法分析

public static class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。而Reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>。Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。

5.2 新API WordCount分析

1)源代码程序

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

public static void main (String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

1)Map过程

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其 map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回 车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并 将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。

2)Reduce过程

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

3)执行MapReduce任务

public static void main (String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用 TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine 和Reduce过程中的处理。还设置了Map 过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入 路径 则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用 job.waitForCompletion() 方法执行任务。

5.3 新旧API区别

Hadoop最新版本的MapReduce Release 0.20.0的API包括了一个全新的Mapreduce JAVA API,有时候也称为上下文对象。

新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。

新的API和旧的API之间有下面几个明显的区别。

  • 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。

  • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。

  • 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。

  • 新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。

  • 新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置, 这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来 完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

(最后的部分参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html)

正文到此结束
Loading...