MapReduce

admin
admin 2019年09月06日
  • 在其它设备中阅读本文章

MapReduce

MapReduce是一种 编程模型 ,用于 大规模数据集 并行运算 。概念 "Map( 映射 )" 和 "Reduce( 归约 )",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

MapReduce 定义

MapReduce 是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

  1. MapReduce 是一个基于集群的 高性能并行计算平台 (Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
  2. MapReduce 是一个 并行计算与运行软件框架 (Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
  3. MapReduce 是一个 并行程序设计模型与方法 (Programming Model & Methodology)。它借助于函数式程序设计语言 Lisp 的设计思想,提供了一种简便的并行程序设计方法,用 Map 和 Reduce 两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。

MapReduce 由来

MapReduce 最早是由 Google 公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google 公司设计 MapReduce 的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。Google 公司发明了 MapReduce 之后首先用其重新改写了其搜索引擎中的 Web 文档索引处理系统。但由于 MapReduce 可以普遍应用于很多大规模数据的计算问题,因此自发明 MapReduce 以后,Google 公司内部进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google 公司内有上万个各种不同的算法问题和程序都使用 MapReduce 进行处理。

2003 年和 2004 年,Google 公司在国际会议上分别发表了两篇关于 Google 分布式文件系统和 MapReduce 的论文,公布了 Google 的 GFS 和 MapReduce 的基本原理和主要设计思想。

Hadoop 的思想来源于 Google 的几篇论文,Google 的那篇 MapReduce 论文里说:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。这句话提到了 MapReduce 思想的渊源,大致意思是,MapReduce 的灵感来源于函数式语言(比如 Lisp)中的内置函数 map 和 reduce。函数式语言也算是阳春白雪了,离我们普通开发者总是很远。简单来说,在函数式语言里,map 表示对一个列表(List)中的每个元素做计算,reduce 表示对一个列表中的每个元素做迭代计算。它们具体的计算是通过传入的函数来实现的,map 和 reduce 提供的是计算的框架。不过从这样的解释到现实中的 MapReduce 还太远,仍然需要一个跳跃。再仔细看,reduce 既然能做迭代计算,那就表示列表中的元素是相关的,比如我想对列表中的所有元素做相加求和,那么列表中至少都应该是数值吧。而 map 是对列表中每个元素做单独处理的,这表示列表中可以是杂乱无章的数据。这样看来,就有点联系了。在 MapReduce 里,Map 处理的是原始数据,自然是杂乱无章的,每条数据之间互相没有关系;到了 Reduce 阶段,数据是以 key 后面跟着若干个 value 来组织的,这些 value 有相关性,至少它们都在一个 key 下面,于是就符合函数式语言里 map 和 reduce 的基本思想了。

这样我们就可以把 MapReduce 理解为,把一堆杂乱无章的数据按照某种特征归纳起来,然后处理并得到最后的结果。Map 面对的是杂乱无章的互不相关的数据,它解析每个数据,从中提取出 key 和 value,也就是提取了数据的特征。经过 MapReduce 的 Shuffle 阶段之后,在 Reduce 阶段看到的都是已经归纳好的数据了,在此基础上我们可以做进一步的处理以便得到结果。这就回到了最初,终于知道 MapReduce 为何要这样设计。
2004 年,开源项目 Lucene(搜索索引程序库)和 Nutch(搜索引擎)的创始人 Doug Cutting 发现 MapReduce 正是其所需要的解决大规模 Web 数据处理的重要技术,因而模仿 Google MapReduce,基于 Java 设计开发了一个称为 Hadoop 的开源 MapReduce 并行计算框架和系统。自此,Hadoop 成为 Apache 开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。

MapReduce 的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。尽管 MapReduce 还有很多局限性,但人们普遍公认,MapReduce 是到目前为止最为成功、最广为接受和最易于使用的大数据并行处理技术。MapReduce 的发展普及和带来的巨大影响远远超出了发明者和开源社区当初的意料,以至于马里兰大学教授、2010 年出版的《Data-Intensive Text Processing with MapReduce》一书的作者 Jimmy Lin 在书中提出:MapReduce 改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到目前为止所见到的最为成功的基于大规模计算资源的计算模型。

MapReduce 映射和化简

简单说来,一个映射函数就是对一些独立元素组成的概念上的列表(例如,一个测试成绩的列表)的每一个元素进行指定的操作(比如前面的例子里,有人发现所有学生的成绩都被高估了一分,它可以定义一个“减一”的映射函数,用来修正这个错误。)。事实上,每个元素都是被独立操作的,而原始列表没有被更改,因为这里创建了一个新的列表来保存新的答案。这就是说,Map 操作是可以高度并行的,这对高性能要求的应用以及并行计算领域的需求非常有用。

而化简操作指的是对一个列表的元素进行适当的合并(继续看前面的例子,如果有人想知道班级的平均分该怎么做?它可以定义一个化简函数,通过让列表中的元素跟自己的相邻的元素相加的方式把列表减半,如此递归运算直到列表只剩下一个元素,然后用这个元素除以人数,就得到了平均分。)。虽然他不如映射函数那么并行,但是因为化简总是有一个简单的答案,大规模的运算相对独立,所以化简函数在高度并行环境下也很有用。

MapReduce 分布可靠

MapReduce 通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性的返回它所完成的工作和最新的状态。如果一个节点保持沉默超过一个预设的时间间隔,主节点(类同 Google File System 中的主服务器)记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。每个操作使用命名文件的原子操作以确保不会发生并行线程间的冲突;当文件被改名的时候,系统可能会把他们复制到任务名以外的另一个名字上去。(避免副作用)。

化简操作工作方式与之类似,但是由于化简操作的可并行性相对较差,主节点会尽量把化简操作只分配在一个节点上,或者离需要操作的数据尽可能近的节点上;这个特性可以满足 Google 的需求,因为他们有足够的带宽,他们的内部网络没有那么多的机器。

MapReduce 用途

在 Google,MapReduce 用在非常广泛的应用程序中,包括“分布 grep,分布排序,web 连接图反转,每台机器的词矢量,web 访问日志分析,反向索引构建,文档聚类,机器学习,基于统计的机器翻译...”值得注意的是,MapReduce 实现以后,它被用来重新生成 Google 的整个索引,并取代老的 ad hoc 程序去更新索引。
MapReduce 会生成大量的临时文件,为了提高效率,它利用 Google 文件系统来管理和访问这些文件。
在谷歌,超过一万个不同的项目已经采用 MapReduce 来实现, 包括大规模的算法图形处理、文字处理、数据挖掘、机器学习、统计机器翻译以及众多其他领域。

其他实现
Nutch 项目开发了一个实验性的 MapReduce 的实现,也即是后来大名鼎鼎的 hadoop
Phoenix 是斯坦福大学开发的基于多核多处理器、共享内存的 MapReduce 实现。

MapReduce 主要功能

MapReduce 提供了以下的主要功能:

1)数据划分和计算任务调度:
系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map 节点或 Reduce 节点),同时负责监控这些节点的执行状态,并负责 Map 节点执行的同步控制。

2)数据 / 代码互定位:
为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。

3)系统优化:
为了减少数据通信开销,中间结果数据进入 Reduce 节点前会进行一定的合并处理;一个 Reduce 节点所处理的数据可能会来自多个 Map 节点,为了避免 Reduce 计算阶段发生数据相关性,Map 节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce 节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

4)出错检测和恢复:
以低端商用服务器构成的大规模 MapReduce 计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce 需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。

MapReduce 主要技术特征

MapReduce 设计上具有以下主要的技术特征:

1)向“外”横向扩展,而非向“上”纵向扩展
即 MapReduce 集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵、不易扩展的高端服务器。
对于大规模数据处理,由于有大 量数据存储需要,显而易见,基于低端服务器的集群远比基于高端服务器的集群优越,这就是为什么 MapReduce 并行计算集群会基于低端服务器实现的原 因。

2)失效被认为是常态
MapReduce 集群中使用大量的低端服务器,因此,节点硬件失效和软件出错是常态,因而一个良好设计、具有高容错性的并行计算系统不能因为节点 失效而影响计算服务的质量,任何节点失效都不应当导致结果的不一致或不确定性;任何一个节点失效时,其他节点要能够无缝接管失效节点的计算任务;当失效节 点恢复后应能自动无缝加入集群,而不需要管理员人工进行系统配置。
MapReduce 并行计算软件框架使用了多种有效的错误检测和恢复机制,如节点自动重 启技术,使集群和计算框架具有对付节点失效的健壮性,能有效处理失效节点的检测和恢复。

3)把处理向数据迁移
传统高性能计算系统通常有很多处理器节点与一些外存储器节点相连,如用存储区域网络(Storage Area,SAN Network)连接的磁盘阵列,因此,大规模数据处理时外存文件数据 I / O 访问会成为一个制约系统性能的瓶颈。
为了减少大规模数据并行计算系统中的数据 通信开销,代之以把数据传送到处理节点(数据向处理器或代码迁移),应当考虑将处理向数据靠拢和迁移。MapReduce 采用了数据 / 代码互定位的技术方法,计算节点将首先尽量负责计算其本地存储的数据,以发挥数据本地化特点,仅当节点无法处理本地数据时,再采用就近原则寻找其他可用计算节点,并把数据传送到该可用计算节点。

4)顺序处理数据、避免随机访问数据
大规模数据处理的特点决定了大量的数据记录难以全部存放在内存,而通常只能放在外存中进行处理。由于磁盘的顺序访问要远比随机访问快得多,因此 MapReduce 主要设计为面向顺序式大规模数据的磁盘访问处理。
为了实现面向大数据集批处理的高吞吐量的并行处理,MapReduce 可以利用集群中 的大量数据存储节点同时访问数据,以此利用分布集群中大量节点上的磁盘集合提供高带宽的数据访问和传输。

5)为应用开发者隐藏系统层细节
软件工程实践指南中,专业程序员认为之所以写程序困难,是因为程序员需要记住太多的编程细节(从变量名到复杂算法的边界情况处理),这对大脑记忆是 一个巨大的认知负担,需要高度集中注意力;而并行程序编写有更多困难,如需要考虑多线程中诸如同步等复杂繁琐的细节。由于并发执行中的不可预测性,程序的 调试查错也十分困难;而且,大规模数据处理时程序员需要考虑诸如数据分布存储管理、数据分发、数据通信和同步、计算结果收集等诸多细节问题。
MapReduce 提供了一种抽象机制将程序员与系统层细节隔离开来,程序员仅需描述需要计算什么(What to compute),而具体怎么去计算(How to compute)就交由系统的执行框架处理,这样程序员可从系统层细节中解放出来,而致力于其应用本身计算问题的算法设计。

6)平滑无缝的可扩展性
这里指出的可扩展性主要包括两层意义上的扩展性:数据扩展和系统规模扩展性。
理想的软件算法应当能随着数据规模的扩大而表现出持续的有效性,性能上的下降程度应与数据规模扩大的倍数相当;在集群规模上,要求算法的计算性能应能随着节点数的增加保持接近线性程度的增长。绝大多数现有的单机算法都达不到 以上理想的要求;把中间结果数据维护在内存中的单机算法在大规模数据处理时很快失效;从单机到基于大规模集群的并行计算从根本上需要完全不同的算法设计。奇妙的是,MapReduce 在很多情形下能实现以上理想的扩展性特征。

多项研究发现,对于很多计算问题,基于 MapReduce 的计算性能可随节点数目增长保持近似于线性的增长。

MapReduce 案例

如果想统计下过去 10 年计算机论文出现最多的几个单词,看看大家都在研究些什么,那收集好论文后,该怎么办呢?
方法一:我可以写一个小程序,把所有论文按顺序遍历一遍,统计每一个遇到的单词的出现次数,最后就可以知道哪几个单词最热门了。
这种方法在数据集比较耗时,是非常有效的,而且实现最简单,用来解决这个问题很合适。

方法二:写一个多线程程序,并发遍历论文。
这个问题理论上是可以高度并发的,因为统计一个文件时不会影响统计另一个文件。当我们的机器是多核或者多处理器,方法二肯定比方法一高效。但是写一个多线程程序要比方法一困难多了,我们必须自己同步共享数据,比如要防止两个线程重复统计文件。

方法三:把作业交给多个计算机去完成。
我们可以使用方法一的程序,部署到 N 台机器上去,然后把论文集分成 N 份,一台机器跑一个作业。这个方法跑得足够快,但是部署起来很麻烦,我们要人工把程序 copy 到别的机器,要人工把论文集分开,最痛苦的是还要把 N 个运行结果进行整合(当然我们也可以再写一个程序)。

方法四:让 MapReduce 来帮帮我们吧!
MapReduce 本质上就是方法三,但是如何拆分文件集,如何 copy 程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给 MapReduce。

MapReduce 伪代码
实现 Map 和 Reduce 两个函数
Map 函数和 Reduce 函数是交给用户实现的,这两个函数定义了任务本身。

Map 函数
接受一个键值对(key-value pair),产生一组中间键值对。MapReduce 框架会将 map 函数产生的中间键值对里键相同的值传递给一个 reduce 函数。

ClassMapper
methodmap(String input_key, String input_value):
// input_key: text document name
// input_value: document contents
for eachword w ininput_value:
EmitIntermediate(w, "1");

Reduce 函数
接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。

ClassReducer
method reduce(String output_key,Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
intresult = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result));

在统计词频的例子里,map 函数接受的键是文件名,值是文件的内容,map 逐个遍历单词,每遇到一个单词 w,就产生一个中间键值对 <w, "1">,这表示单词 w 咱又找到了一个;MapReduce 将键相同(都是单词 w)的键值对传给 reduce 函数,这样 reduce 函数接受的键就是单词 w,值是一串 "1"(最基本的实现是这样,但可以优化),个数等于键为 w 的键值对的个数,然后将这些“1”累加就得到单词 w 的出现次数。最后这些单词的出现次数会被写到用户定义的位置,存储在底层的分布式存储系统(GFS 或 HDFS)。

工作原理
工作原理.jpg
一切都是从最上方的 user program 开始的,user program 链接了 MapReduce 库,实现了最基本的 Map 函数和 Reduce 函数。图中执行的顺序都用数字标记了。

  1. MapReduce 库先把 user program 的输入文件划分为 M 份(M 为用户定义),每一份通常有 16MB 到 64MB,如图左方所示分成了 split0~4;然后使用 fork 将用户进程拷贝到集群内其它机器上。
  2. user program 的副本中有一个称为 master,其余称为 worker,master 是负责调度的,为空闲 worker 分配作业(Map 作业或者 Reduce 作业),worker 的数量也是可以由用户指定的。
  3. 被分配了 Map 作业的 worker,开始读取对应分片的输入数据,Map 作业数量是由 M 决定的,和 split 一一对应;Map 作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给 map 函数,map 函数产生的中间键值对被缓存在内存中。
  4. 缓存的中间键值对会被定期写入本地磁盘,而且被分为 R 个区,R 的大小是由用户定义的,将来每个区会对应一个 Reduce 作业;这些中间键值对的位置会被通报给 master,master 负责将信息转发给 Reduce worker。
  5. master 通知分配了 Reduce 作业的 worker 它负责的分区在什么位置(肯定不止一个地方,每个 Map 作业产生的中间键值对都可能映射到所有 R 个不同分区),当 Reduce worker 把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个 Reduce 作业(谁让分区少呢),所以排序是必须的。
  6. reduce worker 遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给 reduce 函数,reduce 函数产生的输出会添加到这个分区的输出文件中。
  7. 当所有的 Map 和 Reduce 作业都完成了,master 唤醒正版的 user program,MapReduce 函数调用返回 user program 的代码。

所有执行完毕后,MapReduce 输出放在了 R 个分区的输出文件中(分别对应一个 Reduce 作业)。用户通常并不需要合并这 R 个文件,而是将其作为输入交给另一个 MapReduce 程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意 Map/Reduce 作业和 map/reduce 函数的区别:Map 作业处理一个输入数据的分片,可能需要调用多次 map 函数来处理每个输入键值对;Reduce 作业处理一个分区的中间键值对,期间要对每个不同的键调用一次 reduce 函数,Reduce 作业最终也对应一个输出文件。

MapReduce 经典实例

MapReduce 的一个经典实例是 Hadoop。用于处理大型分布式数据库。由于 Hadoop 关联到云以及云部署,大多数人忽略了一点,Hadoop 有些属性不适合一般企业的需求,特别是移动应用程序。下面是其中的一些特点:

  • Hadoop 的最大价值在于数据库,而 Hadoop 所用的数据库是移动应用程序所用数据库的 10 到 1000 倍。对于许多人来说,使用 Hadoop 就是杀鸡用牛刀。
  • Hadoop 有显著的设置和处理开销。Hadoop 工作可能会需要几分钟的时间,即使相关数据量不是很大。
  • Hadoop 在支持具有多维上下文数据结构方面不是很擅长。例如,一个定义给定地理变量值的记录,然后使用垂直连接,来连续定义一个比 hadoop 使用的键值对定义更复杂的数据结构关系。
  • Hadoop 必须使用迭代方法处理的问题方面用处不大——尤其是几个连续有依赖性步骤的问题。

MapReduce (EMR),这是一项 Hadoop 服务。Hadoop 旨在同期文件系统工作,以 HDFS 著称。
当用户用 EMR 创建了一个 Hadoop 集群,他们可以从 AWS S3 或者一些其他的数据存储复制数据到集群上的 HDFS,或者也可以直接从 S3 访问数据。HDFS 使用本地存储,而且通常提供了比从 S3 恢复更好的性能,但是在运行 Hadoop 工作之前,也需要时间从 S3 复制数据到 HDFS。如果 EMR 集群要运行一段时间,且针对多项工作使用相同的数据,可能值得额外的启动时间来从 S3 复制数据到 HDFS。