打造分布式文件系统-Mapreduce
前面进行了Hadoop配置安装
以及了解了HDFS
接下来就是Mapreduce了.其概念就是Map(映射)Reduce(化简)主要思想,都是从函数式编程语言里借来的,以及从矢量编程语言里借来的特性.
相关的原理咱先忽略,直接进入实例应用.
从hadoop源码中hadoop-2.2.0-src/hadoop-mapreduce-project/hadoop-mapreduce-examples/
打开其示例程序WordCount如下:
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
分为三步:
- TokenizerMapper 类实现 Mapper 接口中的 map 方法,输入参数中的 value
是文本文件中的一行 - IntSumReducer类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key,
values 是由 Map 任务输出的中间结果,values 是一个 Iterator, 遍历这个
Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key
是一个单词,value 是词频。只需要将所有的 value
相加,就可以得到这个单词的总的出现次数。 - 配置 Job并运行
就这么简单的三步,实现了分布式统计一批文本文件中单词出现的频率的功能.
参数 | 作用 | 缺省值 | 其它实现 |
---|---|---|---|
**InputFormat** |
将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个
Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现,
将一个 InputSplit 解析成 |
TextInputFormat
(针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader
将 InputSplit 解析成 |
SequenceFileInputFormat |
**OutputFormat** | 提供一个 RecordWriter 的实现,负责输出最终结果 |
TextOutputFormat
(用 LineRecordWriter 将最终结果写成纯文件文件,每个 |
SequenceFileOutputFormat |
**OutputKeyClass** | 输出的最终结果中 key 的类型 | LongWritable | |
**OutputValueClass** | 输出的最终结果中 value 的类型 | Text | |
**MapperClass** |
Mapper 类,实现 map 函数,完成输入的 |
IdentityMapper
(将输入的 |
LongSumReducer, LogRegexMapper, InverseMapper |
**CombinerClass** | 实现 combine 函数,将中间结果中的重复 key 做合并 | null (不对中间结果中的重复 key 做合并) | |
**ReducerClass** | Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果 | IdentityReducer (将中间结果直接输出为最终结果) | AccumulatingReducer, LongSumReducer |
**InputPath** | 设定 job 的输入目录, job 运行时会处理输入目录下的所有文件 | null | |
**OutputPath** | 设定 job 的输出目录,job 的最终结果会写入输出目录下 | null | |
**MapOutputKeyClass** | 设定 map 函数输出的中间结果中 key 的类型 | 如果用户没有设定的话,使用 OutputKeyClass | |
**MapOutputValueClass** | 设定 map 函数输出的中间结果中 value 的类型 | 如果用户没有设定的话,使用 OutputValuesClass | |
**OutputKeyComparator** | 对结果中的 key 进行排序时的使用的比较器 | WritableComparable | |
**PartitionerClass** | 对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。 | HashPartitioner (使用 Hash 函数做 partition) | KeyFieldBasedPartitioner PipesPartitioner |
咱们以后再研究其内部实现及原理
参考资料:
http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/
http://www.infoq.com/cn/articles/MapReduce-Best-Practice-1
http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html#%E7%9B%AE%E7%9A%84
http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html