了解MapReduce

了解MapReduce

MapReduce引入

首先,我们需要了解什么是MapReduce,它是用来做什么的?

比如说,我们有10TB的数据,放在一台机子上肯定是不现实的,因此需要存放在之前我们介绍的存储系统:HDFS中。但是,如果放在HDFS上,由于大文件被切成一个个小块,如果这时候要对这些文件建立倒排索引、统计词频的话,按照原来的逻辑,可能会出现一定的问题——如果按照原来的逻辑,我们可以写一个客户端,将所有DataNode上的数据读进来进行处理,但这样说到底还是一个单机程序,没有体现出分布式系统的优势。

这时候一个合理的逻辑是把我的程序发送到集群的每一台DataNode上去做统计,也就是把运算往数据靠近,而不是数据往运算靠近,数据在哪里我就去那里运算,最终得到了DataNode上的局部结果。因此,我们不但需要分布式存储, 还需要分布式运算.

但分布式运算的构想很丰满,显示却很“骨干”,这边有几个痛点问题需要解决:

  1. 怎么把程序包分发到集群中的DataNode上?

我们该用什么方法去保证每个节点上的环境都是相同的,该用什么系统去分发程序。总不可能用U盘一个一个拷贝过来。

  1. 程序如何分发到有数据的Datanode上?

假设有30个节点存放着我们要跑的数据,那么我希望我的程序就运行在这30个节点上,如果在其他节点上运行也不是不可以,只不过数据需要靠网络传输,又变成数据向计算靠拢的情况了。

  1. 在DataNode上运行的结果,怎么将其汇总起来?

如果只在一台机器上汇总,那么这台机器的负载肯定会很高。但如果在多台机器上汇总,逻辑就会变得异常复杂。比如说,如果要统计词频,应该如何设计那些单词分发到哪个节点上去统计。

这些问题是很难解决的,这时候就需要MapReduce了。对于程序员来说,我们只关心如何处理数据的逻辑,并不关心数据是如何被分配,又是如何被汇总的,因为这些我们不擅长却异常复杂。MapReduce就是将这些不擅长又必须克服的痛点给解决了,它将这些东西全部封装起来。因此,对程序员来说底层的分发、计算、汇总都是透明的。

MPI

其实在MapReduce出现以前,程序员可以使用MPI并行处理数据。但是这个接口库存在以下缺点:

  1. 从用户编程角度快来看,程序员需要考虑到进程之间的并行问题,并且进程之间的通信需要用户在程序中显式表达,这增加了编程的复杂性,使得底层对用户不够透明。
  2. 从系统实现角度来看,MPI 以多进程的方式运行。因此除非在编写程序的时候加入了故障恢复功能,否则MPI不具备容错能力,系统鲁棒性很差。

因此,分布式计算系统本身需要具备容错能力,这也是MapReduce系统与MPI的重要区别。

MR数据模型

上面可以看做是对MapReduce的抽象了解。其实,MR会将数据抽象为一系列键值对,这些键值对通常来自于存储在HDFS中的文件。如下图,我们看到MapReduce的输入是一组键值对,进行转换后的输出也是一组键值对

MR计算模型

我们可以用做菜来比如MapReduce的计算过程:统合来看,MapReduce就是你有很多各种各样的蔬菜水果面包(Input),有很多厨师,不同的厨师分到了不同的蔬菜水果面包,自己主动去拿过来(Split),拿到手上以后切碎(Map),切碎以后给到不同的烤箱里,冷藏机里(Shuffle),冷藏机往往需要主动去拿,拿到这些东西存放好以后会根据不同的顾客需求拿不同的素材拼装成最终的结果,这就是Reduce,产生结果以后会放到顾客那边等待付费(Ticket),这个过程是Finalize。

从上图我们可以看出,Map转换前后的键值对的内容通常都会不同。MapReduce框架会将Map转换的道德键值对按照键进行分组,这就是Shuffle过程 (紫色的归为紫色,蓝色的归为蓝色)。之后,Reduce会对相同键的键值对进行计算,并可根据需要将计算结果进行一次键值对转换后输出。

从用户编程角度看,程序员不再需要像MPI一样关注节点之间的通信,可以像编写集中式程序那样便携MapReduce代码。

从系统角度看,系统可以并行启动一系列的进程去执行Map和Reduce操作,而且一旦这些进程出现故障,MapReduce框架可以自动进行容错处理,无需用户编写程序。

MR体系架构

如上图所示,MapReduce系统采用主从架构,里面有主要工作部件:

  • JobTracker: 主节点运行的后台进程,负责整个系统的资源管理作业管理
    • 资源管理是指JobTracker通过监控TaskTracker来管理系统拥有的计算资源。
    • 作业管理是指JobTracker负责将作业(Job)拆分成任务(Task),并进行任务调度以及跟踪任务的运行进度、资源使用量等信息。
  • TaskTracker: 从节点运行的后台进程,负责管理本节点的资源、执行JobTracker的命令并会报情况。TaskTracker使用slot等量划分本节点上的资源量(如CPU,RAM), 接收JobTracker发送过来的命令并执行,通过心跳将本节点上资源使用情况和任务运行进度汇报给JobTracker
  • Task : 从节点在应用程序执行过程中所启动的进程,负责任务执行。JobTracker根据TaskTracker回报的信息进行调度,命令存在空闲slot 的TaskTracker启动Task进程执行map或者reduce任务,即MapTaskReduceTask。在Hadoop MapReduce的实现中,Task又被称为Child
  • 客户端:客户端所在节点为提交应用程序启动的进程,负责将用户编写的MapReduce程序提交给JobTracker。在Haddop MapReduce的实现中,客户端被称为RunJar

通常,MapReduce和HDFS会同时部署在一个节点,否则会导致MapReduce从远程DataNode中读入数据,而且需要通过网络将输出结果写入远程DataNode中,这样的代价是很高的。

因此,JobTracker和NameNode部署在同一个物理节点,而TaskTracker和DataNode部署在同一个物理节点。在这种方式下,可以最大程度避免远程写入的网络开销。如下图所示:

这种部署方式使得MapReduce在输入输出数据的读写时,无需通过网络移动数据,体现了计算向数据靠拢的理念。

执行流程

上图是MapReduce执行应用程序的流程:

  1. 客户端将用户编写的MapReduce作业的配置信息,jar包等上传到共享文件系统,通常是HDFS
  2. 客户端提交作业给JobTracker,并告诉JobTracker作业信息的位置
  3. JobTracker读取作业的信息,生成一系列Map和Reduce任务,调度给拥有空闲slot的TaskTracker
  4. TaskTracker根据JobTracker的指令启动Child进程执行Map任务,Map任务将从共享文件系统读取输入数据。
  5. JobTracker从TaskTracker处获得Map任务进度信息
  6. 一旦Map任务完成之后,JobTracker会将Reduce任务分发给TaskTracker
  7. TaskTracker根据JobTracker的指令启动Child进程执行Reduce作用,Reduce任务将从Map任务所在节点的本地磁盘中拉取Map的输出结果
  8. JobTracker从TaskTracker处获取Reduce任务进度信息
  9. 当Reduce任务运行结束并将结果写入共享文件系统,则意味着整个作业执行完毕。

工作原理

MapReduce 工作原理如上图所示:每个Map任务从分布式文件系统(如HDFS)读取需要处理的数据,解析出键值对。然后新生成的键值对经过Shuffle传给Reduce任务,键相同的键值对都会发送给相同的Reduce任务,Reduce任务针对这些键值对进行计算后将结果以新的键值形式写入到分布式文件系统当中。

在这个过程中主要有Map、Shuffle、Reduce三个阶段,如果加上输入和输出就是五个阶段

输入

通常一个Map任务需要开启多个MapTask,那么我们可不就可以把存放在HDFS中的每一个Block”投喂”给一个Map任务呢?听起来这是可行的,但实际上,由于一个键值对极有可能被分在两个物理块当中,因此采用这种方法会导致Map任务读到的输入数据不完整

因此,MapReduce有自己的输入格式,它提供了数据逻辑划分和键值对解析功能。我们从下图可以看出,文件实际上会被划分为一个个Split(分片),每个分片会包含一些meta data(如数据长度、起始位置、所在节点),而且Split不会存在夸块的键值对(因为重新划分了)。因此,Map任务读取的单位是分片而不是文件块, 而且分片的数量决定了map任务的个数。

简单来说,输入格式通过定义分片和键值对当解析的方法完成了从物理存储(HDFS物理文件块)MapReduce程序可处理的逻辑数据之间的映射。

Map阶段

第一步:对于一个键值对来说,Map的过程就是将其转换为一个或者多个键值对,即$[K_1,V_1]\longrightarrow \text{List}([K_2,V_2])$ . 例如,在词频统计的例子中,$[行偏移量,An,An]$ 或转变为 $[An,1],[An,1]$

第二步:接着会根据partition方式,确定$[K_2,V_2]$键值对所属的分区,而总的分区数目等于Reduce任务的数量。得到$[K_2,V_2,\text{partition ID}]$ ,并将其放入缓冲区。

第三步:当缓冲区的数据到达阈值(如80%), Map任务将会锁定当前阈值以内的缓冲区,现将缓冲区内的$[K_2,V_2,\text{partition ID}]$ 按照partition ID进行排序,再将同一个分区内的键值对按照键进行排序。这样做的目的是让发往同一个Reduce任务的键值对聚集在一起,且拥有相同键的键值对也聚集在一起。然后,Map任务将排序后的结果写到磁盘形成文件。排序和写入的过程并不阻塞新的键值对写入缓冲区,因此同一Map任务一般会产生多个磁盘文件,属于同一分区的键值对也会分布在多个磁盘文件当中。

值得指出的是,入股哦此时定义了combine方法,那么此时就会对相同的键值对进行Combine.例如WordCount示例中的合并操作将两个相同的键值对$[An,1],[An,1]$ ,合并得到 $[An,2]$

第四步:随着磁盘中溢写到达阈值,Map任务会进一步归并形成一个文件,使得数以一个分区的所有键值对连续存储。这本质上是一个多路归并排序的过程,最终会形成一个连续的大文件形式存储在磁盘中。

问:为什么要把结果写入到磁盘中去呢?为了容错,但为了保证容错牺牲了一定的性能。

Shuffle阶段

现在,Map阶段已经将键值对分为很多分区,但是有很多Map任务,因此在不同的DataNode上会存在相同名字的分区。SHuffle过程就是将这些位于不同DataNode但是分区相同的键值对发送给同一个Reduce任务。也就是将$List([K_2,V_2])\longrightarrow [K_2,List(V_2)]$ 。仍用$[An,1],[An,1]$ 为例,Shuffle的过程就是将两个键值对发送给同一个Reduce任务并形成 $[An,{1,1}]$ 。

Map任务完成后,会通知所在节点的TaskTracker,并将Map输出文件所在的位置告知TaskTracker,之后,TaskTracker会进一步通知JobTracker。当系统中的Map任务完成率到达设定阈值的时候,系统就会启动Reduce任务。比如说,当阈值设定为60%的话,100个Map任务只需要完成60个,就可以qidongReduce任务,而不必等到100个Map任务全部完成

所以说,Shuffle阶段是一个中间阶段。

Reduce 阶段

Reduce阶段启动之后,会定期想JobTracker获取各个节点已经完成的Map任务的输出位置。要记住,Reduce任务虽然不会等到所有Map任务执行结束才拉取Map任务的输出结果,但是拉取的数据必然来自于已经完成运行的Map任务

对于一键值对来说,Reduce过程是将其转换为一个或者多个键值对。即 $[K_2,\text{List}(V_2)]$键值转换为 $[K_3,V_3]$ . 例如,在词频统计的例子中,Reduce过程对$[An,{1,1,1,1,1}]$进行求和操作转变为 $[An,5]$

Reduce和Map阶段非常类似,Reduce任务从磁盘拉取的任务也会先放入内存缓冲区 ,其中存储的是$[K_2,V_2]$的键值对,当内存缓存区中的数据占用空间到达阈值之后,Reduce任务将对缓存区中的键值按键进行归并排序。如果此时用户定义了conbine方法,则此时Reduce任务先执行合并操作然后将内存缓存区中的数据溢写到磁盘文件。注意,此时Reduce任务会继续拉取数据,因此随着数据的拉取将不断形成多个存在磁盘上的溢写文件

因为存在多个溢写文件,因此拥有相同键的键值对可能分在在不同文件中,因而这些文件需要再次归并。注意,并不是将所有溢写文件一次性归并,而是在达到一定数量的时候就执行归并操作。与Map端不同的是,最后一次归并操作的结果无需写入磁盘,而是直接可以执行reduce方法处理,这样可以省去一次磁盘读写操作。

Reduce的结果会写入到用户指定的存储系统中,通常是HDFS

数据输出

事实上,每个Reduce 任务都会输出一个文件。也就是说,MapReduce的输出结果是一组文件而不是一个文件。

容错

JobTracker故障

JobTracker是MapReduce框架的”首脑“,一旦它发生故障,那么正在运行的所有作业内部状态信息将会丢失。因此,重新启动JobTracker之后,所有作业需要重新执行。

从而,MapReduce中JobTracker的单节点故障瓶颈是该架构设计的缺陷。

TaskTracker故障

当TaskTracker进程故障时,JobTracker将接收不到TaskTracker发来的心跳。因此,在该TaskTracker所在节点运行过的任务都会被标记为失败。JobTracker会将这些任务调度到别的TaskTracker所在节点重新执行。这个过程对用户来说是透明的,只会感觉到该作业执行变慢了而已

Task故障

Task进程故障比较常见,因其错误的原因通常有代码错误、磁盘损坏等。当TaskTracker检测到一个任务故障时,TaskTracker将在下一次心跳里JobTracker报告该故障,JobTracker收到报告的消息后会重新调度该任务。

重新调动的额任务可以再集群的任一节点重试,但是当一个任务经过最大尝试数的尝试运行后仍然失败,那么整个作业将被标记为失败。

对于Map和Reduce任务来说,重试的过程有所不同:

  • 对于重试的Map任务来说,将从输入路径重新读取数据。
  • 对于重试的Reduce任务来说,将重新拉取Map端的输出文件,如果读取不到该文件,那么相应的Map任务也需要重新执行。因为Reduce只会读取Map任务结束后输出的文件,因此只要磁盘不损坏,Reduce都可以重新读取。

编程实例

代码框架

map方法的框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.edu.ecnu.mapreduce.example.java.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/* 步骤1:确定输入键值对[K1,V1]和 输出键值对 [K2,V2]的数据类型 */
public class WordCountMapper extends Mapper<K1数据类型, V1数据类型, K2数据类型, V2数据类型> {

@Override
protected void map(K1数据类型 key, V1数据类型 value, Context context)
throws IOException, InterruptedException {
/* 步骤2:编写处理逻辑将[K1,V1]转换为[K2,V2]并输出 */
//.....逻辑.....
context.write(K2, V2);
}
}

reduce方法的框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.edu.ecnu.mapreduce.example.java.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/* 步骤1:确定输入键值对[K2,List(V2)] 和 输出键值对[K3,V3]的数据类型 */
public class WordCountReducer extends Reducer<K2数据类型, V2数据类型, K3数据类型, V3数据类型> {
@Override
protected void reduce(K2数据类型 key, Iterable<V2数据类型> values, Context context)
throws IOException, InterruptedException {
/* 步骤2:编写处理逻辑将[K2,List(V2)]转换为[K3,V3]并输出 */
//....逻辑.....
// 输出计数结果
context.write(K3, V3);
}
}

主方法框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
/* 步骤1:设置作业的信息 */
Job job = Job.getInstance(getConf(), getClass().getSimpleName());
// 设置程序的类名
job.setJarByClass(getClass());

// 设置数据的输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 设置map和reduce方法
job.setMapperClass(自定义Map类);
job.setReducerClass(自定义Reduce类);
job.setCombinerClass(自定义Combine类);

// 设置map方法的输出键值对数据类型
// 如果不设置的话,默认为 TextInputFormat 和 TextOutputFormat
job.setMapOutputKeyClass(map方法的输出键数据类型);
job.setMapOutputValueClass(map方法的输出值数据类型);

// 设置reduce方法的输出键值对数据类型
job.setOutputKeyClass(reduce方法的输出键数据类型);
job.setOutputValueClass(reduce方法的输出值数据类型);

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
/* 步骤2:运行作业 */
int exitCode = ToolRunner.run(new WordCount(), args);
System.exit(exitCode);
}
}

词频统计

词频统计是一个很简单但很经典的MapReduce的应用案例。

输入:是一个包含大量单词的文本文件

输出:文件中每个单词及其出现次数(频数),每个单词和其频数占一行,单词和频数之间有间隔。

如下所示:

总体来说,词频统计的流程如下:

Map过程:会把文本的每行内容转换为键值对:[单词,1]

Reduce过程:

  • 单词相同的键值对会被发送到同一个Reduce中(shuffle)
  • 对单词相同的键值对进行计数
  • 输出计数后的结果[单词, 频数]

具体的过程如下:

  1. 每个Map任务通过map方法将 输入键值对$[\text{行偏移量,每行内容}]$ 转换为输出键值对 $[\text{单词,频数}]$ 。比如将$[\text{offset},An,An]$ 转换为$[An,1]$
  2. Map任务在缓冲区占用量达到一定阈值的时候(这里阈值为2)将键值对溢写到本地磁盘文件中。并在溢写的过程中执行归并操作。例如,把三个$[An,1]$转换为 $[An,{1,1,1}]$
  3. 之后,Map任务将归并产生的键值对经过Shuffle过程发送给Reduce任务。
  4. 在Reduce阶段,Reduce任务会进行归并操作。例如将$[My,1],[My,1],[My,{1,1,1}]$ 转换为 $[My,{1,1,1,1,1}]$
  5. 在归并操作完成之后,Reduce任务会通过Reduce方法将归并产生的键值对$[\text{单词,List(频数)}]$ 转换为$[\text{单词,频数}]$ 并输出。例如:将$[My,{1,1,1,1,1}]$ 转化为 $[My,5]$

代码

  • WordCountMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/* 步骤2:编写处理逻辑将[K1,V1]转换为[K2,V2]并输出 */
// 以空格作为分隔符拆分成单词
String[] datas = value.toString().split(" ");
for (String data : datas) {
// 输出分词结果
context.write(new Text(data), new IntWritable(1));
}
}
}
  • WordCountReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
/* 步骤2:编写处理逻辑将[K2,List(V2)]转换为[K3,V3]并输出 */
int sum = 0;
// 遍历累加求和
for (IntWritable value : values) {
sum += value.get();
}
// 输出计数结果
context.write(key, new IntWritable(sum));
}
}
  • WordCount 主方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class WordCount extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
/* 步骤1:设置作业的信息 */
Job job = Job.getInstance(getConf(), getClass().getSimpleName());
// 设置程序的类名
job.setJarByClass(getClass());

// 设置数据的输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 设置map和reduce方法
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setCombinerClass(WordCountReducer.class);

// 设置map方法的输出键值对数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce方法的输出键值对数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
/* 步骤2:运行作业 */
int exitCode = ToolRunner.run(new WordCount(), args);
System.exit(exitCode);
}
}

关系表自然连接和优化

网页链接排名

K-Means聚类

问题

MapReduce中的Merge和Combine有什么区别

Combine函数需要程序员显示定义,如果定义了combine方法,那么

而Merge和程序员是没有关系的

MapReduce中的分布式缓存机制有什么用

-------------本文结束,感谢您的阅读-------------