《MapReduce经典例子WordCount运行详解.doc》由会员分享,可在线阅读,更多相关《MapReduce经典例子WordCount运行详解.doc(16页珍藏版)》请在课桌文档上搜索。
1、wordHadoop MapReduce经典例子WordCount 运行详解1、MapReduce理论简介1.1 MapReduce编程模型 MapReduce 采用“分而治之的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce 就是“任务的分解与结果的汇总。在Hadoop 中,用于执行MapReduce 任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker 是用于调度工作的,TaskTracker 是用于执行工作的。一个Hadoop集群中只有一台
2、JobTracker。在分布式计算中,MapReduce 框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以与网络通信等复杂问题,把处理过程高度抽象为两个函数:map 和reduce,map 负责把任务分解成多个任务,reduce 负责把分解后多任务处理的结果汇总起来。需要注意的是,用MapReduce 来处理的数据集或任务必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进展处理。1.2 MapReduce处理过程在Hadoop 中,每个MapReduce 任务都被初始化为一个Job,每个 Job 又可以分为两种阶段:ma
3、p 阶段和reduce 阶段。这两个阶段分别用两个函数表示,即map 函数和reduce 函数。map 函数接收一个形式的输入,然后同样产生一个形式的中间输出,Hadoop 函数接收一个如形式的输入,然后对这个 value 集合进展处理,每个reduce 产生0 或1 个输出,reduce 的输出也是形式的。MapReduce 处理大数据集的过程116 / 16创建时间:2012/3/1修改时间:2017/3/1 修改次数:02、运行WordCount程序单词计数是最简单也是最能表现MapReduce 思想的程序之一,可以称为MapReduce 版“Hello World,该程序的完整代码可以
4、在Hadoop 安装包的“src/examples目录下找到。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如如下图所示。2.1 准备工作现在以“hadoop普通用户登录服务器。1创建本地示例文件首先在“/home/hadoop目录下创建文件夹“file。接着创建两个文本文件file1.txt 和, file1.txt 内容为使“Hello World 而,的内容为“Hello Hadoop。2创建时间2012/3/1修改时间:2017/3/1 修改次数:02在 HDFS 上创建输入文件夹3上传本地 file 中文件到集群的input 目录下2.2 运行例子1在集群上运行 W
5、ordCount 程序备注:以input 作为输入目录,output 目录作为输出目录。已经编译好的WordCount 的Jar 在“/usr/hadoop下面, 就是,所以在下面执行命令时记得把路径写全了,不然会提示找不到该Jar 包。2MapReduce 执行过程显示信息Hadoop 命令会启动一个JVM 来运行这个MapReduce 程序,并自动获得Hadoop 的配置,同时把类的路径与其依赖关系参加到Hadoop 的库中。以上就是Hadoop Job 的运行记录,从这里可以看到,这个Job 被赋予了一个ID 号:job_201202292213_0002,而且得知输入文件有两个Tota
6、l input paths to process : 2,同时还可以了解map 的输入输出记录record 数与字节数,以与reduce 输入输出记录。比如说,在本例中,map 的task 数量是2 个,reduce3创建时间:2012/3/1修改时间:2012/3/1 修改次数:0的task 数量是一个。map 的输入record 数是2 个,输出record 数是4 个等信息。2.3 查看结果1查看 HDFS 上output 目录内容从上图中知道生成了三个文件,我们的结果在“part-r-00000中。2查看结果输出文件内容3、WordCount源码分析3.1 特别数据类型介绍Hadoop
7、 提供了如下内容的数据类型,这些数据类型都实现了Writableparable 接口,以便用这些类型定义的数据可以被序列化进展网络传输和文件存储,以与进展大小比拟。BooleanWritable:标准布尔型数值ByteWritable:单字节数值DoubleWritable:双字节数FloatWritable:浮点数IntWritable:整型数LongWritable:长整型数Text:使用 UTF8 格式存储的文本NullWritable:当中的 key 或value 为空时使用3.2 旧的WordCount分析1源代码程序package org.apache.hadoop.example
8、s;4创建时间:2012/3/1修改时间:2012/3/1 修改次数:0import java.io.IOException;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred
9、.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.h
10、adoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;publicclassWordCountpublicprivateprivatestatic class Map extends MapReduceBaseMapper finalstatic IntWritable one = newText word = new Text();
11、implementsIntWritable( 1 );publicvoid map(LongWritable key, Text value,OutputCollector output, Reporter reporter)throws IOExceptionString line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens() word.set(tokenizer.nextToken(); output.collect(word
12、, one);5创建时间:2012/3/1修改时间:2012/3/1 修改次数:0publicstatic class Reduce extends MapReduceBaseReducerimplementspublicvoid reduce(Text key, Iterator values,OutputCollector output, Reporter reporter)throws IOExceptionint sum = 0 ;while (values.hasNext() sum += values.next().get();output.collect(key, new Int
13、Writable(sum);public static void main(String args) throws Exception JobConf conf = new JobConf(WordCount. class ); conf.setJobName(wordcount );conf.setOutputKeyClass(Text.class );conf.setOutputValueClass(IntWritable.class );conf.setMapperClass(Map.class );conf.setbinerClass(Reduce.class );conf.setRe
14、ducerClass(Reduce.class );conf.setInputFormat(TextInputFormat.class );conf.setOutputFormat(TextOutputFormat.class );FileInputFormat.setInputPaths(conf, new Path(args 0 );FileOutputFormat.setOutputPath(conf, new Path(args 1 );JobClient.runJob(conf);3主方法Main 分析public staticvoidmain(String args) throws
15、 ExceptionJobConf conf = new JobConf(WordCount. class );conf.setJobName(wordcount );6创建时间:2012/3/1修改时间:2012/3/1 修改次数:0conf.setOutputKeyClass(Text.class );conf.setOutputValueClass(IntWritable.class );conf.setMapperClass(Map.class );conf.setbinerClass(Reduce.class );conf.setReducerClass(Reduce.class )
16、;conf.setInputFormat(TextInputFormat.class );conf.setOutputFormat(TextOutputFormat.class );FileInputFormat.setInputPaths(conf, new Path(args 0 );FileOutputFormat.setOutputPath(conf, new Path(args 1 );JobClient.runJob(conf);首先讲解一下Job 的初始化过程。main 函数调用Jobconf 类来对MapReduce Job 进展初始化,然后调用setJobName()方法命名
17、这个 Job。对Job 进展合理的命名有助于更快地找到Job,以便在 JobTracker 和Tasktracker 的页面中对其进展监视。JobConf conf = new JobConf(WordCount. class );conf.setJobName(wordcount );接着设置Job 输出结果的中key 和value 数据类型,因为结果是,所以key 设置为“Text类型,相当于Java 中String 类型。Value 设置为“IntWritable,相当于Java 中的int 类型。conf.setOutputKeyClass(Text.class );conf.setO
18、utputValueClass(IntWritable.class );然后设置Job 处理的Map拆分、biner中间结果合并以与Reduce合并的相关处理类。这里用Reduce 类来进展Map 产生的中间结果合并,防止给网络数据传输产生压力。conf.setMapperClass(Map.class );conf.setbinerClass(Reduce.class );conf.setReducerClass(Reduce.class );接着就是调用setInputPath()和 setOutputPath()设置输入输出路径。conf.setInputFormat(TextInput
19、Format.class );conf.setOutputFormat(TextOutputFormat.class );7创建时间:2012/3/1修改时间:2012/3/1 修改次数:01InputFormat 和InputSplitInputSplit 是Hadoop 定义的用来传送给每个单独的map 的数据,InputSplit 存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit 的方法可以通过InputFormat()来设置。当数据传送给map 时,map 会将输入分片传送到InputFormat,InputFormat 如此调用方法getReco
20、rdReader()生成 RecordReader,RecordReader 再通过creatKey()、creatValue()方法创建可供map 处理的对。简而言之,InputFormat()方法是用来生成可供 map处理的对的。Hadoop 预定义了多种方法将不同类型的输入数据转化为map 能够处理的对,它们都继承自InputFormat,分别是:InputFormat | |-posableInputFormat |-positeInputFormat |-DBInputFormat |-FileInputFormat |-bineFileInputFormat |-KeyValueT
21、extInputFormat |-NLineInputFormat |-SequenceFileInputFormat |-TeraInputFormat |-TextInputFormat其中TextInputFormat 是Hadoop 默认的输入方法,TextInputFormat 中,在每个文件或其一局部都会单独地作为map 的输入,而这个是继承自FileInputFormat 的。之后,每行数据都会生成一条记录,每条记录如此表示成形式:l key 值是每个数据的记录在数据分片中字节偏移量,数据类型是LongWritable;l value 值是每行的内容,数据类型是Text。2Out
22、putFormat每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String 类型再输出。3Map 类中map 方法分析publicprivateprivate finalstatic IntWritable one = newText word = new Text();IntWritable( 1 );staticclass Map extends MapReduceBase implements
23、 Mapper8创建时间:2012/3/1修改时间:2012/3/1 修改次数:0publicvoid map(LongWritable key, Text value,OutputCollector output, Reporter reporter)throws IOExceptionString line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens() word.set(tokenizer.nextToken(); outpu
24、t.collect(word, one); Map 类继承自MapReduceBase,并且它实现了Mapper 接口,此接口是一个规X类型,它有4 种形式的参数,分别用来指定map 的输入key 值类型、输入value 值类型、输出key值类型和输出value 值类型。在本例中,因为使用的是TextInputFormat,它的输出 key 值是LongWritable 类型,输出value 值是Text 类型,所以map 的输入类型为。在本例中需要输出这样的形式,因此输出的key 值类型是Text,输出的 value 值类型是IntWritable。实现此接口类还需要实现map 方法,map
25、 方法会具体负责对输入进展操作,在本例中,map 方法对输入的行以空格为单位进展切分,然后使用OutputCollect 收集输出的。4Reduce 类中reduce 方法分析publicpublicvoid reduce(Text key, Iterator values,OutputCollector output, Reporter reporter)throws IOExceptionstaticclass Reduce extends MapReduceBase implementsReducerint sum = 0 ;while (values.hasNext() sum +=
26、values.next().get();output.collect(key, new IntWritable(sum);Reduce 类也是继承自MapReduceBase 的,需要实现Reducer 接口。Reduce 类以map的输出作为输入,因此Reduce 的输入类型是。而Reduce 的输出是单词9和它的数目,因此,它的输出类型是。Reduce 类也要实现reduce 方法,在此方法中,reduce 函数将输入的key 值作为输出的key 值,然后将获得多个value 值加起来,作为输出的值。3.3 新的WordCount分析1源代码程序package org.apache.had
27、oop.examples;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Map
28、per;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount public static class TokenizerMapperextends Mapperprivat
29、e final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context ) throws IOException, InterruptedException StringTokenizer itr = new StringTokenizer(value.toString(); while (itr.hasMoreTokens() word.set(itr.nextToken(); conte
30、xt.write(word, one); public static class IntSumReducerextends Reducer 10创建时间:2012/3/1修改时间:2012/3/1 修改次数:0private IntWritable result = new IntWritable();public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException int sum = 0; for (IntWritable val : values)
31、 sum += val.get(); result.set(sum); context.write(key, result);public static void main(String args) throws Exception Configuration conf = new Configuration(); String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) System.err.println(Usage: wordcount );
32、 System.exit(2); Job job = new Job(conf, word count); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setbinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileIn
33、putFormat.addInputPath(job, new Path(otherArgs0); FileOutputFormat.setOutputPath(job, new Path(otherArgs1); System.exit(job.waitForpletion(true) ? 0 : 1);1Map 过程public static class TokenizerMapperextends Mapperprivate final static IntWritable one = new IntWritable(1);private Text word = new Text();p
34、ublic void map(Object key, Text value, Context context ) throws IOException, InterruptedException 11创建时间:2012/3/1修改时间:2012/3/1 修改次数:0StringTokenizer itr = new StringTokenizer(value.toString();while (itr.hasMoreTokens() word.set(itr.nextToken(); context.write(word, one); Map 过程需要继承org.apache.hadoop.m
35、apreduce 包中Mapper 类,并重写其map 方法。通过在map 方法中添加两句把key 值和value 值输出到控制台的代码,可以发现map 方法中value 值存储的是文本文件中的一行以回车符为行完毕标记,而key 值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer 类将每一行拆分成为一个个的单词,并将作为map 方法的结果输出,其余的工作都交有MapReduce 框架处理。2Reduce 过程public static class IntSumReducerextends Reducer private IntWritable result =
36、new IntWritable();public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key, result);Reduce 过程需要继承org.apache.hadoop.mapreduce 包中Reducer 类,并重写其reduce方法。Map 过
37、程输出中key 为单个单词,而values 是对应单词的计数值所组成的列表,Map 的输出就是Reduce 的输入,所以reduce 方法只要遍历values 并求和,即可得到某个单词的总次数。3执行 MapReduce 任务public static void main(String args) throws Exception Configuration conf = new Configuration(); String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.l
38、ength != 2) System.err.println(Usage: wordcount );某某工业大学软件工程与理论实验室编辑:虾皮12创建时间:2012/3/1修改时间:2012/3/1 修改次数:0System.exit(2);Job job = new Job(conf, word count);job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setbinerClass(IntSumReducer.class);job.setReducerClass(IntSumR
39、educer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs0);FileOutputFormat.setOutputPath(job, new Path(otherArgs1);System.exit(job.waitForpletion(true) ? 0 : 1);在MapReduce 中,由Job 对象负责管理和运行一个计算任务,并通过Job 的一些方法对任务的参
40、数进展相关的设置。此处设置了使用TokenizerMapper 完成Map 过程中的处理和使用IntSumReducer 完成bine 和Reduce 过程中的处理。还设置了Map 过程和Reduce过程的输出类型:key 的类型为Text,value 的类型为IntWritable。任务的输出和输入路径如此由命令行参数指定,并由FileInputFormat 和FileOutputFormat 分别设定。完成相应任务的参数设定后,即可调用job.waitForpletion()方法执行任务。4、WordCount处理过程本节将对WordCount 进展更详细的讲解。详细执行步骤如下:1将文件
41、拆分成 splits,由于测试用的文件较小,所以每个文件为一个 split,并将文件按行分割形成对,如图4-1 所示。这一步由MapReduce 框架自动完成,其中偏移量即key 值包括了回车所占的字符数Windows 和Linux 环境会不同。图4-1 分割过程2将分割好的对交给用户定义的 map 方法进展处理,生成新的对,如图4-2 所示。13分割结果map方法输出 map() map()图4-2 执行map 方法 3得到 map 方法输出的对后,Mapper 会将它们按照key 值进展排序,并执行bine 过程,将key 至一样value 值累加,得到Mapper 的最终输出结果。如图4-3 所示。map方法输出 排序结果 bine输出 Map端排序bine过程Map端排序bine过程 图4-3 Map 端排序与bine