Hadoop源代码分析(完整图文版) part 1
Hadoop源代码分析(一)关键字: 分布式 云计算 Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。 GoogleCluster: http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubb
在网上看到了很多此文章的装载,但是都是纯文字,这篇文章在没有图片的情况下阅读起来意义不大了。花了点时间上传了100多张图片,希望对大家学习hadoop有帮助。
Hadoop源代码分析(一)
关键字: 分布式 云计算
Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。
GoogleCluster: http://research.google.com/archive/googlecluster.html
Chubby:http://labs.google.com/papers/chubby.html
GFS:http://labs.google.com/papers/gfs.html
BigTable:http://labs.google.com/papers/bigtable.html
MapReduce:http://labs.google.com/papers/mapreduce.html
很快,Apache上就出现了一个类似的解决方案,目前它们都属于Apache的Hadoop项目,对应的分别是:
Chubby-->ZooKeeper
GFS-->HDFS
BigTable-->HBase
MapReduce-->Hadoop
目前,基于类似思想的Open Source项目还很多,如Facebook用于用户分析的Hive。
HDFS作为一个分布式文件系统,是所有这些项目的基础。分析好HDFS,有利于了解其他系统。由于Hadoop的HDFS和MapReduce是同一个项目,我们就把他们放在一块,进行分析。
下图是MapReduce整个项目的顶层包图和他们的依赖关系。Hadoop包之间的依赖关系比较复杂,原因是HDFS提供了一个分布式文件系统,该系统提供API,可以屏蔽本地文件系统和分布式文件系统,甚至象Amazon S3这样的在线存储系统。这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。功能的相互引用,造成了蜘蛛网型的依赖关系。一个典型的例子就是包conf,conf用于读取系统配置,它依赖于fs,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包fs中被抽象了。
Hadoop的关键部分集中于图中蓝色部分,这也是我们考察的重点。
· 大小: 78.3 KB
Hadoop源代码分析(二)
下面给出了Hadoop的包的功能分析。
Package | Dependences |
tool | 提供一些命令行工具,如DistCp,archive |
mapreduce | Hadoop的Map/Reduce实现 |
filecache | 提供HDFS文件的本地缓存,用于加快Map/Reduce的数据访问速度 |
fs | 文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口 |
hdfs | HDFS,Hadoop的分布式文件系统实现 |
ipc | 一个简单的IPC的实现,依赖于io提供的编解码功能 参考:http://zhangyu8374.javaeye.com/blog/86306 |
io | 表示层。将各种数据编码/解码,方便于在网络上传输 |
net | 封装部分网络功能,如DNS,socket |
security | 用户和用户组信息 |
conf | 系统的配置参数 |
metrics | 系统统计数据的收集,属于网管范畴 |
util | 工具类 |
record | 根据DDL(数据描述语言)自动生成他们的编解码函数,目前可以提供C++和Java |
http | 基于Jetty的HTTP Servlet,用户通过浏览器可以观察文件系统的一些状态信息和日志 |
log | 提供HTTP访问日志的HTTP Servlet |
Hadoop源代码分析(三)
由于Hadoop的MapReduce和HDFS都有通信的需求,需要对通信的对象进行序列化。Hadoop并没有采用Java的序列化,而是引入了它自己的系统。
org.apache.hadoop.io中定义了大量的可序列化对象,他们都实现了Writable接口。实现了Writable接口的一个典型例子如下:
</pre><pre name="code" class="java">1.public class MyWritable implements Writable {
2. // Some data
3. private int counter;
4. private long timestamp;
5.
6. public void write(DataOutput out) throws IOException {
7. out.writeInt(counter);
8. out.writeLong(timestamp);
9. }
10.
11. public void readFields(DataInput in) throws IOException {
12. counter = in.readInt();
13. timestamp = in.readLong();
14. }
15.
16. public static MyWritable read(DataInput in) throws IOException {
17. MyWritable w = new MyWritable();
18. w.readFields(in);
19. return w;
20. }
21.}
其中的write和readFields分别实现了把对象序列化和反序列化的功能,是Writable接口定义的两个方法。下图给出了庞大的org.apache.hadoop.io中对象的关系。
这里,我把ObjectWritable标为红色,是因为相对于其他对象,它有不同的地位。当我们讨论Hadoop的RPC时,我们会提到RPC上交换的信息,必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。ObjectWritable对象保存了一个可以在RPC上传输的对象和对象的类型信息。这样,我们就有了一个万能的,可以用于客户端/服务器间传输的Writable对象。例如,我们要把上面例子中的对象作为RPC请求,需要根据MyWritable创建一个ObjectWritable,ObjectWritable往流里会写如下信息
对象类名长度,对象类名,对象自己的串行化结果
这样,到了对端,ObjectWritable可以根据对象类名创建对应的对象,并解串行。应该注意到,ObjectWritable依赖于WritableFactories,那存储了Writable子类对应的工厂。我们需要把MyWritable的工厂,保存在WritableFactories中(通过WritableFactories.setFactory)。
Hadoop源代码分析(五)
介绍完org.apache.hadoop.io以后,我们开始来分析org.apache.hadoop.rpc。RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论HDFS的,通信可能发生在:
· Client-NameNode之间,其中NameNode是服务器
· Client-DataNode之间,其中DataNode是服务器
· DataNode-NameNode之间,其中NameNode是服务器
· DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端
如果我们考虑Hadoop的Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop引入了一个RPC框架。该RPC框架利用的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。但是,该RPC框架要求调用的参数和返回结果必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出IOException异常。(参考自http://zhangyu8374.javaeye.com/blog/86306)
既然是RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。但是类Server是一个抽象类,类RPC封装了Server,利用反射,把某个对象的方法开放出来,变成RPC中的服务器。
下图是org.apache.hadoop.rpc的类图。
Hadoop源代码分析(六)
既然是RPC,自然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。在这里我们来仔细考察org.apache.hadoop.rpc.Client。下面的图包含了org.apache.hadoop.rpc.Client中的关键类和关键方法。
由于Client可能和多个Server通信,典型的一次HDFS读,需要和NameNode打交道,也需要和某个/某些DataNode通信。这就意味着某一个Client需要维护多个连接。同时,为了减少不必要的连接,现在Client的做法是拿ConnectionId(图中最右侧)来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口号)对象和一个用户信息对象。这就是说,同一个用户到同一个InetSocketAddress的通信将共享同一个连接。
连接被封装在类Client.Connection中,所有的RPC调用,都是通过Connection,进行通信。一个RPC调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个Connection上的不同调用,每个调用都有唯一的id。调用是否结束也需要一个标记,所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表,维护在这个连接上的所有Call:
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
一个RPC调用通过addCall,把请求加到Connection里。为了能够在这个框架上传输Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组,我们一般把Call需要的参数打包成为ObjectWritable对象。
Client.Connection会通过socket连接服务器,连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过Writable对象来进行请求的发送/应答了。注意,每个Client.Connection会起一个线程,不断去读取socket,并将收到的结果解包,找出对应的Call,设置Call并通知结果已经获取。
Call使用Obejct的wait和notify,把RPC上的异步消息交互转成同步调用。
还有一点需要注意,一个Client会有多个Client.Connection,这是一个很自然的结果。
Hadoop源代码分析(七)
聊完了Client聊Server,按惯例,先把类图贴出来。
需要注意的是,这里的Server类是个抽象类,唯一抽象的地方,就是
public abstract Writable call(Writable param, long receiveTime) throws IOException;
这表明,Server提供了一个架子,Server的具体功能,需要具体类来完成。而具体类,当然就是实现call方法。
我们先来分析Server.Call,和Client.Call类似,Server.Call包含了一次请求,其中,id和param的含义和Client.Call是一致的。不同点在后面三个属性,connection是该Call来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,发送给客户端。属性timestamp是请求到达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的response是请求处理的结果,可能是一个Writable的串行化结果,也可能一个异常的串行化结果。
Server.Connection维护了一个来之客户端的socket连接。它处理版本校验,读取请求并把请求发送到请求处理线程,接收处理结果并把结果发送给客户端。
Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是上面图里的Listener。
请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。
对于处理完的请求,需要将结果写回去,同样,利用NIO,只需要一个线程,相关的逻辑在Responder里。
Hadoop源代码分析(八)
(注:本节需要用到一些Java反射的背景)
有了Client和Server,很自然就能RPC啦。下面轮到RPC.java啦。
一般来说,分布式对象一般都会要求根据接口生成存根和框架。如CORBA,可以通过IDL,生成存根和框架。但是,在org.apache.hadoop.rpc,我们就不需要这样的步骤了。上类图。
为了分析Invoker,我们需要介绍一些Java反射实现Dynamic Proxy的背景。
Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler,后者是一个接口。所谓Dynamic Proxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些interface。
这个Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。
我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler实现中。
在Hadoop的RPC中,Invoker实现了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的Client,通过socket传递到服务器端。就是说,你在proxy类上的任何调用,都通过Client发送到远方的服务器上。
Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和parameters,调用方法参数。注意,它实现了Writable接口,可以串行化。
RPC.Server实现了org.apache.hadoop.ipc.Server,你可以把一个对象,通过RPC,升级成为一个服务器。服务器接收到的请求(通过Invocation),解串行化以后,就变成了方法名,方法参数列表和参数列表。利用Java反射,我们就可以调用对应的对象的方法。调用的结果再通过socket,返回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy的使用者了。
Hadoop源代码分析(九)
一个典型的HDFS系统包括一个NameNode和多个DataNode。NameNode维护名字空间;而DataNode存储数据块。
DataNode负责存储数据,一个数据块在多个DataNode中有备份;而一个DataNode对于一个块最多只包含一个备份。所以我们可以简单地认为DataNode上存了数据块ID和数据块内容,以及他们的映射关系。
一个HDFS集群可能包含上千DataNode节点,这些DataNode定时和NameNode通信,接受NameNode的指令。为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。
DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回其还也包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。应该注意的是:NameNode不会发起到DataNode的请求,在这个通信过程中,它们是严格的客户端/服务器架构。
DataNode当然也作为服务器接受来自客户端的访问,处理数据块读/写请求。DataNode之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。
下面我们就来具体分析一下DataNode的实现。DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。我们先来看本地数据块管理部分。
安装Hadoop的时候,我们会指定对应的数据块存放目录,当我们检查数据块存放目录目录时,我们回发现下面有个叫dfs的目录,所有的数据就存放在dfs/data里面。
其中有两个文件,storage里存的东西是一些出错信息,貌似是版本不对…云云。in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它上面的一个锁。
接下来是3个目录,current存的是当前有效的数据块,detach存的是快照(snapshot,目前没有实现),tmp保存的是一些操作需要的临时数据块。
但我们进入current目录以后,就会发现有一系列的数据块文件和数据块元数据文件。同时还有一些子目录,它们的名字是subdir0到subdir63,子目录下也有数据块文件和数据块元数据。这是因为HDFS限定了每个目录存放数据块文件的数量,多了以后会创建子目录来保存。
数据块文件显然保存了HDFS中的数据,数据块最大可以到64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:
blk_3148782637964391313
blk_3148782637964391313_242812.meta
上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的版本号,用于一致性检查。
在current目录下还有下面几个文件:
VERSION,保存了一些文件系统的元信息。
dncp_block_verification.log.curr和dncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。
Hadoop源代码分析(一零)
在继续分析DataNode之前,我们有必要看一下系统的工作状态。启动HDFS的时候,我们可以选择以下启动参数:
· FORMAT("-format"):格式化系统
· REGULAR("-regular"):正常启动
· UPGRADE("-upgrade"):升级
· ROLLBACK("-rollback"):回滚
· FINALIZE("-finalize"):提交
· IMPORT("-importCheckpoint"):从Checkpoint恢复。
作为一个大型的分布式系统,Hadoop内部实现了一套升级机制(http://wiki.apache.org/hadoop/Hadoop_Upgrade)。upgrade参数就是为了这个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用rollback进行回滚;如果过了一段时间,系统运行正常,那就可以通过finalize,正式提交这次升级(跟数据库有点像啊)。
importCheckpoint选项用于NameNode发生故障后,从某个检查点恢复。
有了上面的描述,我们得到下面左边的状态图:
大家应该注意到,上面的升级/回滚/提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上,甚至可能出现某些节点已经升级成功,但有些节点可能处于中间状态的情况,所以Hadoop采用类似于数据库事务的升级机制也就不是很奇怪。
大家先理解一下上面的状态图,它是下面我们要介绍DataNode存储的基础。
Hadoop源代码分析(一一)
我们来看一下升级/回滚/提交时的DataNode上会发生什么(在类DataStorage中实现)。
前面我们提到过VERSION文件,它保存了一些文件系统的元信息,这个文件在系统升级时,会发生对应的变化。
升级时,NameNode会将新的版本号,通过DataNode的登录应答返回。DataNode收到以后,会将当前的数据块文件目录改名,从current改名为previous.tmp,建立一个snapshot,然后重建current目录。重建包括重建VERSION文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到previous.tmp的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件,current和previous.tmp中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后,会在current里写入新的VERSION文件,并将previous.tmp目录改名为previous,完成升级。
了解了升级的过程以后,回滚就相对简单。因为说有的旧版本信息都保存在previous目录里。回滚首先将current目录改名为removed.tmp,然后将previous目录改名为current,最后删除removed.tmp目录。
提交的过程,就是将上面的previous目录改名为finalized.tmp,然后启动一个线程,将该目录删除。
下图给出了上面的过程:
需要注意的是,HDFS的升级,往往只是支持从某一个特点的老版本升级到当前版本。回滚时能够恢复到的版本,也是previous中记录的版本。
下面我们继续分析DataNode。
文字分析完DataNode存储在文件上的数据以后,我们来看一下运行时对应的数据结构。从大到小,Hadoop中最大的结构是Storage,最小的结构,在DataNode上是block。
类Storage保存了和存储相关的信息,它继承了StorageInfo,应用于DataNode的DataStorage,则继承了Storage,总体类图如下:
StorageInfo包含了3个字段,分别是layoutVersion:版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致。namespaceID是Storage的ID,cTime,creation time。
和StorageInfo相比,Storage就是个大家伙了。
Storage可以包含多个根(参考配置项dfs.data.dir的说明),这些根通过Storage的内部类StorageDirectory来表示。StorageDirectory中最重要的方法是analyzeStorage,它将根据系统启动时的参数和我们上面提到的一些判断条件,返回系统现在的状态。StorageDirectory可能处于以下的某一个状态(与系统的工作状态一定的对应):
NON_EXISTENT:指定的目录不存在;
NOT_FORMATTED:指定的目录存在但未被格式化;
COMPLETE_UPGRADE:previous.tmp存在,current也存在
RECOVER_UPGRADE:previous.tmp存在,current不存在
COMPLETE_FINALIZE:finalized.tmp存在,current也存在
COMPLETE_ROLLBACK:removed.tmp存在,current也存在,previous不存在
RECOVER_ROLLBACK:removed.tmp存在,current不存在,previous存在
COMPLETE_CHECKPOINT:lastcheckpoint.tmp存在,current也存在
RECOVER_CHECKPOINT:lastcheckpoint.tmp存在,current不存在
NORMAL:普通工作模式。
StorageDirectory处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的current夹来进行判断。状态改变需要的工作文件夹包括:
previous:用于升级后保存以前版本的文件
previous.tmp:用于升级过程中保存以前版本的文件
removed.tmp:用于回滚过程中保存文件
finalized.tmp:用于提交过程中保存文件
lastcheckpoint.tmp:应用于从NameNode中,导入一个检查点
previous.checkpoint:应用于从NameNode中,结束导入一个检查点
有了这些状态,就可以对系统进行恢复(通过方法doRecover)。恢复的动作如下(结合上面的状态转移图):
COMPLETE_UPGRADE:mv previous.tmp -> previous
RECOVER_UPGRADE:mv previous.tmp -> current
COMPLETE_FINALIZE:rm finalized.tmp
COMPLETE_ROLLBACK:rm removed.tmp
RECOVER_ROLLBACK:mv removed.tmp -> current
COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp -> previous.checkpoint
RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current
我们以RECOVER_UPGRADE为例,分析一下。根据升级的过程,
1. current->previous.tmp
2. 重建current
3. previous.tmp->previous
当我们发现previous.tmp存在,current不存在,我们知道只需要将previous.tmp改为current,就能恢复到未升级时的状态。
StorageDirectory还管理着文件系统的元信息,就是我们上面提过StorageInfo信息,当然,StorageDirectory还保存每个具体用途自己的信息。这些信息,其实都存储在VERSION文件中,StorageDirectory中的read/write方法,就是用于对这个文件进行读/写。下面是某一个DataNode的VERSION文件的例子:
1. #Fri Nov 14 10:27:35 CST 2008
2. namespaceID=1950997968
3. storageID=DS-697414267-127.0.0.1-50010-1226629655026
4. cTime=0
5. storageType=DATA_NODE
6. layoutVersion=-16
#Fri Nov 14 10:27:35 CST 2008
namespaceID=1950997968
storageID=DS-697414267-127.0.0.1-50010-1226629655026
cTime=0
storageType=DATA_NODE
layoutVersion=-16
对StorageDirectory的排他操作需要锁,还记得我们在分析系统目录时提到的in_use.lock文件吗?它就是用来给整个系统加/解锁用的。StorageDirectory提供了对应的lock和unlock方法。
分析完StorageDirectory以后,Storage类就很简单了。基本上都是对一系列StorageDirectory的操作,同时Storage提供一些辅助方法。
DataStorage是Storage的子类,专门应用于DataNode。上面我们对DataNode的升级/回滚/提交过程,就是对DataStorage的doUpgrade/doRollback/doFinalize分析得到的。
DataStorage提供了format方法,用于创建DataNode上的Storage,同时,利用StorageDirectory,DataStorage管理存储系统的状态。
Hadoop源代码分析(一二)
分析完Storage相关的类以后,我们来看下一个大家伙,FSDataset相关的类。
上面介绍Storage时,我们并没有涉及到数据块Block的操作,所有和数据块相关的操作,都在FSDataset相关的类中进行处理。下面是类图:
Block是对一个数据块的抽象,通过前面的讨论我们知道一个Block对应着两个文件,其中一个存数据,一个存校验信息,如下:
blk_3148782637964391313
blk_3148782637964391313_242812.meta
上面的信息中,blockId是3148782637964391313,242812是数据块的版本号,当然,系统还会保存数据块的大小,在类中是属性numBytes。Block提供了一系列的方法来操作对象的属性。
DatanodeBlockInfo存放的是Block在文件系统上的信息。它保存了Block存放的卷(FSVolume),文件名和detach状态。这里有必要解释一下detach状态:我们前面分析过,系统在升级时会创建一个snapshot,snapshot的文件和current里的数据块文件和数据块元文件是通过硬链接,指向了相同的内容。当我们需要改变current里的文件时,如果不进行detach操作,那么,修改的内容就会影响snapshot里的文件,这时,我们需要将对应的硬链接解除掉。方法很简单,就是在临时文件夹里,复制文件,然后将临时文件改名成为current里的对应文件,这样的话,current里的文件和snapshot里的文件就detach了。这样的技术,也叫copy-on-write,是一种有效提高系统性能的方法。DatanodeBlockInfo中的detachBlock,能够对Block对应的数据文件和元数据文件进行detach操作。
介绍完类Block和DatanodeBlockInfo后,我们来看FSVolumeSet,FSVolume和FSDir。我们知道在一个DataNode上可以指定多个Storage来存储数据块,由于HDFS规定了一个目录能存放Block的数目,所以一个Storage上存在多个目录。对应的,FSDataset中用FSVolume来对应一个Storage,FSDir对应一个目录,所有的FSVolume由FSVolumeSet管理,FSDataset中通过一个FSVolumeSet对象,就可以管理它的所有存储空间。
FSDir对应着HDFS中的一个目录,目录里存放着数据块文件和它的元文件。FSDir的一个重要的操作,就是在添加一个Block时,根据需要有时会扩展目录结构,上面提过,一个Storage上存在多个目录,所有的目录,都对应着一个FSDir,目录的关系,也由FSDir保存。FSDir的getBlockInfo方法分析目录下的所有数据块文件信息,生成Block对象,存放到一个集合中。getVolumeMap方法能,则会建立Block和DatanodeBlockInfo的关系。以上两个方法,用于系统启动时搜集所有的数据块信息,便于后面快速访问。
FSVolume对应着是某一个Storage。数据块文件,detach文件和临时文件都是通过FSVolume来管理的,这个其实很自然,在同一个存储系统上移动文件,往往只需要修改文件存储信息,不需要搬数据。FSVolume有一个recoverDetachedBlocks的方法,用于恢复detach文件。和Storage的状态管理一样,detach文件有可能在复制文件时系统崩溃,需要对detach的操作进行回复。FSVolume还会启动一个线程,不断更新FSVolume所在文件系统的剩余容量。创建Block的时候,系统会根据各个FSVolume的容量,来确认Block的存放位置。
FSVolumeSet就不讨论了,它管理着所有的FSVolume。
HDFS中,对一个chunk的写会使文件处于活跃状态,FSDataset中引入了类ActiveFile。ActiveFile对象保存了一个文件,和操作这个文件的线程。注意,线程有可能有多个。ActiveFile的构造函数会自动地把当前线程加入其中。
有了上面的基础,我们可以开始分析FSDataset。FSDataset实现了接口FSDatasetInterface。FSDatasetInterface是DataNode对底层存储的抽象。
下面给出了FSDataset的关键成员变量:
FSVolumeSet volumes;
private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
其中,volumes就是FSDataset使用的所有Storage,ongoingCreates是Block到ActiveFile的映射,也就是说,说有正在创建的Block,都会记录在ongoingCreates里。
下面我们讨论FSDataset中的方法。
public long getMetaDataLength(Block b) throws IOException;
得到一个block的元数据长度。通过block的ID,找对应的元数据文件,返回文件长度。
public MetaDataInputStream getMetaDataInputStream(Block b) throws IOException;
得到一个block的元数据输入流。通过block的ID,找对应的元数据文件,在上面打开输入流。下面对于类似的简单方法,我们就不再仔细讨论了。
public boolean metaFileExists(Block b) throws IOException;
判断block的元数据的元数据文件是否存在。简单方法。
public long getLength(Block b) throws IOException;
block的长度。简单方法。
public Block getStoredBlock(long blkid) throws IOException;
通过Block的ID,找到对应的Block。简单方法。
public InputStream getBlockInputStream(Block b) throws IOException;
public InputStream getBlockInputStream(Block b, long seekOffset) throws IOException;
得到Block数据的输入流。简单方法。
public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff) throws IOException;
得到Block的临时输入流。注意,临时输入流是指对应的文件处于tmp目录中。新创建块时,块数据应该写在tmp目录中,直到写操作成功,文件才会被移动到current目录中,如果失败,就不会影响current目录了。简单方法。
public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
得到一个block的输出流。BlockWriteStreams既包含了数据输出流,也包含了元数据(校验文件)输出流,这是一个相当复杂的方法。
参数isRecovery说明这次写是不是对以前失败的写的一次恢复操作。我们先看正常的写操作流程:首先,如果输入的block是个正常的数据块,或当前的block已经有线程在写,writeToBlock会抛出一个异常。否则,将创建相应的临时数据文件和临时元数据文件,并把相关信息,创建一个ActiveFile对象,记录到ongoingCreates中,并创建返回的BlockWriteStreams。前面我们已经提过,建立新的ActiveFile时,当前线程会自动保存在ActiveFile的threads中。
我们以blk_3148782637964391313为例,当DataNode需要为Block ID为3148782637964391313创建写流时,DataNode创建文件tmp/blk_3148782637964391313做为临时数据文件,对应的meta文件是tmp/blk_3148782637964391313_XXXXXX.meta。其中XXXXXX是版本号。
isRecovery为true时,表明我们需要从某一次不成功的写中恢复,流程相对于正常流程复杂。如果不成功的写是由于提交(参考finalizeBlock方法)后的确认信息没有收到,先创建一个detached文件(备份)。接着,writeToBlock检查是否有还有对文件写的线程,如果有,则通过线程的interrupt方法,强制结束线程。这就是说,如果有线程还在写对应的文件块,该线程将被终止。同时,从ongoingCreates中移除对应的信息。接下来将根据临时文件是否存在,创建/复用临时数据文件和临时数据元文件。后续操作就和正常流程一样,根据相关信息,创建一个ActiveFile对象,记录到ongoingCreates中……
由于这块涉及了一些HDFS写文件时的策略,以后我们还会继续讨论这个话题。
public void updateBlock(Block oldblock, Block newblock) throws IOException;
更新一个block。这也是一个相当复杂的方法。
updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。
public void finalizeBlock(Block b) throws IOException;
提交(或叫:结束finalize)通过writeToBlock打开的block,这意味着写过程没有出错,可以正式把Block从tmp文件夹放到current文件夹。
在FSDataset中,finalizeBlock将从ongoingCreates中删除对应的block,同时将block对应的DatanodeBlockInfo,放入volumeMap中。我们还是以blk_3148782637964391313为例,当DataNode提交Block ID为3148782637964391313数据块文件时,DataNode将把tmp/blk_3148782637964391313移到current下某一个目录,以subdir12为例,这是tmp/blk_3148782637964391313将会挪到current/subdir12/blk_3148782637964391313。对应的meta文件也在目录current/subdir12下。
public void unfinalizeBlock(Block b) throws IOException;
取消通过writeToBlock打开的block,与finalizeBlock方法作用相反。简单方法。
public boolean isValidBlock(Block b);
该Block是否有效。简单方法。
public void invalidate(Block invalidBlks[]) throws IOException;
使block变为无效。简单方法。
public void validateBlockMetadata(Block b) throws IOException;
检查block的有效性。简单方法。
Hadoop源代码分析(一三)
通过上面的一系列介绍,我们知道了DataNode工作时的文件结构和文件结构在内存中的对应对象。下面我们可以来开始分析DataNode上的动态行为。首先我们来分析DataXceiverServer和DataXceiver。DataNode上数据块的接受/发送并没有采用我们前面介绍的RPC机制,原因很简单,RPC是一个命令式的接口,而DataNode处理数据部分,往往是一种流式机制。DataXceiverServer和DataXceiver就是这个机制的实现。其中,DataXceiver还依赖于两个辅助类:BlockSender和BlockReceiver。下面是类图:
(为了简单起见,BlockSender和BlockReceiver的成员变量没有进入UML模型中)
DataXceiverServer很简单,它打开一个端口,然后每接收到一个连接,就创建一个DataXceiver,服务于该连接,并记录该连接的socket,对应的实现在DataXceiverServer的run方法里。当系统关闭时,DataXceiverServer将关闭监听的socket和所有DataXceiver的socket,这样就导致了DataXceiver出错并结束线程。
DataXceiver才是真正干活的地方,目前,DataXceiver支持的操作总共有六条,分别是:
OP_WRITE_BLOCK (80):写数据块
OP_READ_BLOCK (81):读数据块
OP_READ_METADATA (82):读数据块元文件
OP_REPLACE_BLOCK (83):替换一个数据块
OP_COPY_BLOCK (84):拷贝一个数据块
OP_BLOCK_CHECKSUM (85):读数据块检验码
DataXceiver首先读取客户端的版本号并检验,然后再读取一个字节的操作码,并转入相关的子程序进行处理。我们先看一下读数据块的过程吧。
首先看输入,下图是读数据块时,客户端发送过来的信息:
包括了要读取的Block的ID,时间戳,开始偏移和读取的长度,最后是客户端的名字(貌似只是在写日志的时候用到了)。根据上面的信息,我们可以创建一个BlockSender,如果BlockSender没有出错,返回客户端一个正确指示后,否则,返回错误码。成功创建BlockSender以后,就可以开始通过BlockSender.sendBlock发送数据。
下面我们就来分析BlockSender。BlockSender的构造函数看似很复杂,其实就是根据需求(特别是在处理checksum上,因为checksum是基于块的),打开相应的数据流。close()用于释放各种资源,如已经打开的数据流。sendBlock用于发送数据,数据发送包括应答头和后续的数据包。应答头如下(包含DataXceiver中发送的成功标识):
然后后面的数据就组织成数据包来发送,包结构如下:
各个字段含义:
packetLen:包长度,包括包头
offset:偏移量
seqno:包序列号
tail:是否是最后一个包
len:数据长度
checksum:检验数据
data:数据块数据
需要注意的,在写数据前,BlockSender会校验数据,保证数据包中的checksum和数据的一致性。同时,如果数据出错,将会有ChecksumException抛出。
数据传输结束的标志,是一个packetLen长度为0的包。客户端可以返回一个两字节的应答OP_STATUS_CHECKSUM_OK(5)
Hadoop源代码分析(一四)
继续DataXceiver分析,下一块硬骨头是写数据块。HDFS的写数据操作,比读数据复杂N多倍。读数据的时候,只需要在多个数据块文件的选一个读,就可以了;但是,写数据需要同时写到多个数据块文件上,这就比较复杂了。HDFS实现了了Google写文件时的机制,如下图:
数据流从客户端开始,流经一系列的节点,到达最后一个DataNode。图中的所有DataNode只需要写一次硬盘,DataNode1和DataNode2会将从socket上接受到的数据,直接写到到下个节点的socket上。
我们来看一下写数据块的请求。
首先是客户端的版本号和一个字节的操作码,接下来是我们熟悉的blockId和generationStamp。参数pipelineSize是整个数据流链的长度,以上面为例,pipelineSize=3。isRecovery指示这次写是否是一次恢复操作,还记得我们在讨论FSDataset.writeToBlock时的那个参数吗?isRecovery来自客户端。client是客户端的名字,就是发起请求的节点名,需要特别注意的是,如果是从NameNode来的复制请求,client为空。hasSrcDataNode是一个标志位,如果被设置,表明源节点是个DataNode,接下来读取的数据就是DataNode的信息。numTargets是目标节点的数目,包括当前节点,以上面的图为例,DataNode1上这个参数值为3,到了DataNode3,就只有1了。targets包含了目标节点的相关信息,根据这些信息,就可以创建到它们上面的socket连接。targets后跟着的是校验头。
writeBlock最开始是处理上面提到的消息包,然后创建一个BlockReceiver。接下来就是创建一堆用于读写的流,如下图(图中除了in外,都是在writeBlock中创建,这个图还不涉及在BlockReceiver对本地文件读写的流):
在进行实际的数据写之前,上面的这些流会被建立起来(也就是说,DataNode1到DataNode3都可写以后,才开始处理写数据)。如果其中某一个点出错了,那么,出错的节点名会通过mirrorIn发送回来,一直沿着这条链,传播到客户端。
如果一切正常,那么,BlockReceiver.receiveBlock就开始干活了。
BlockReceiver的构造函数会创建写数据块和校验数据的输出流。剩下的就交给receiveBlock这个大家伙了。首先receiveBlock会再启动一个线程(一般来说,BlockReceiver就跑在它自己的线程上),用于处理应答(内部类PacketResponder定义了该线程),然后就不断调用receivePacket读数据。
数据是以分块的形式传送,格式和读Block的时候是一样的。如下图(很奇怪,为啥不抽象为类):
注意:如果当前DataNode处于数据流的中间,该数据包会发送到下一个节点。
接下来的处理,就是处理数据和校验,并分别写到数据块文件和数据块元数据文件。如果出错,抛出的异常会导致receiveBlock关闭相关的输出流,并终止传输。注意,数据校验出错还会上报到NameNode上。
PacketResponder用于处理应答。也就是上面讲的mirrorIn和replyOut。PacketResponder里有一个队列ackQueue,receivePacket每收到一个包,都会往队列里添加一项。PacketResponder的run方法,根据工作的DataNode所处的位置,行为不一样。
最后一个DataNode由于没有后续节点,PacketResponder的ackQueue每收到一项,表明对应的数据块已经处理完毕,那么就可以发送成功应答。如果该应答是最后一个包的,PacketResponder会关闭相关的输出流,并提交(前面讲FSDataset时后我们讨论过的finalizeBlock方法)。
如果DataNode有后续节点,那么,它必须等到后续节点的成功应答,才可以发送应答到它前面的节点。
PacketResponder的run方法还引入了心跳机制,用于检测连接是否还存在。
注意:所有改变DataNode的操作,需要把信息更新到NameNode上,这是通过DataNode.notifyNamenodeReceivedBlock方法,然后通过DataNode统一发送到NameNode上。
Hadoop源代码分析(一五)
DataXceiver支持的的6条操作,我们已经分析完最重要的两条。剩下的分别是:
OP_READ_METADATA (82):读数据块元文件
OP_REPLACE_BLOCK (83):替换一个数据块
OP_COPY_BLOCK (84):拷贝一个数据块
OP_BLOCK_CHECKSUM (85):读数据块检验码
我们逐个讨论。
读数据块元文件的请求如图(操作码82):
应答很简单,应答码(如OP_STATUS_SUCCESS),文件长度(int),数据。
拷贝数据块和替换数据块是一对相对应操作。
替换数据块的请求如图(操作码83)。这个比起上面的读数据块元文件请求,有点复杂。替换一个数据块是系统平衡操作的一部分,用于接收一个数据块。它和普通的数据块写的差别是,它只发生在两个节点上,一个写,一个读,而不需要建立数据链。我们可以比较一下它们在创建BlockReceiver对象时的差别:
blockReceiver = new BlockReceiver(block, proxyReply,
proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
false, "", null, datanode); //OP_REPLACE_BLOCK
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
isRecovery, client, srcDataNode, datanode); //OP_WRITE_BLOCK
首先,proxyReply和in不一样,这是因为发起请求的节点和提供数据的节点并不是同一个。写数据块发起请求方也提供数据,替换数据块请求方不提供数据,而是提供了一个数据源(proxySource参数),由replaceBlock发起一个拷贝数据块的请求,建立数据源。对于拷贝数据块操作,isRecovery=false,client=””, srcDataNode=null。注意,我们在分析BlockReceiver是,讨论过client=””的情况,就是应用于这种场景。
在创建BlockReceiver对象前,需要利用下面介绍的拷贝数据块的请求建立到数据源的socket连接并发送拷贝数据块请求。然后通过BlockReceiver.receiveBlock接收数据。任务成功后将结果通知notifyNamenodeReceivedBlock。
拷贝数据块的请求如图(操作码84)。和读数据块操作请求类似,但是读取的是整个数据块,所以少了很多参数。
读数据块检验码的请求如图(操作码85)。它能够读取某个数据块的检验和的MD5结果,实现的方法很简单。
Hadoop源代码分析(一六)
通过上面的讨论,DataNode上的读/写流程已经基本清楚了。我们来看下一个非主流流程,
DataBlockScanner用于定时对数据块文件进行校验。类图如下:
DataBlockScanner拥有它单独的线程,能定时地从目前DataNode管理的数据块文件进行校验。其实最重要的方法就是verifyBlock,我们来看这个方法最关键的地方:
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode);
DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream());
blockSender.sendBlock(out, null, throttler);
校验利用了BlockSender,因为我们知道BlockSender中,发送数据的同时,会对数据进行校验。verifyBlock只需要读一个Block到一个空输出设备(NullOutputStream),如果有异常,那么校验失败,如果正常,校验成功。
DataBlockScanner其他的辅助方法用于对DataBlockScanner管理的数据块文件信息进行增加/删除,排序操作。同时,校验的信息还会保持在Storage上,保存在dncp_block_verification.log.curr和dncp_block_verification.log.prev中。
Hadoop源代码分析(一七)
周围的障碍扫清以后,我们可以开始分析类DataNode。类图如下:
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable
上面给出了DataNode的继承关系,我们发现,DataNode实现了两个通信接口,其中ClientDatanodeProtocol是用于和Client交互的,InterDatanodeProtocol,就是我们前面提到的DataNode间的通信接口。ipcServer(类图的左下方)是DataNode的一个成员变量,它启动了一个IPC服务,这样,DataNode就能提供ClientDatanodeProtocol和InterDatanodeProtocol的能力了。
我们从main函数开始吧。这个函数很简单,调用了createDataNode的方法,然后就等着DataNode的线程结束。createDataNode首先调用instantiateDataNode初始化DataNode,然后执行runDatanodeDaemon。runDatanodeDaemon会向NameNode注册,如果成功,才启动DataNode线程,DataNode就开始干活了。
初始化DataNode的方法instantiateDataNode会读取DataNode需要的配置文件,同时读取配置的storage目录(可能有多个,看storage的讨论部分),然后把这两参数送到makeInstance中,makeInstance会先检查目录(存在,是目录,可读,可写),然后调用:
new DataNode(conf, dirs);
接下来控制流就到了构造函数上。构造函数调用startDataNode,完成和DataNode相关的初始化工作(注意,DataNode工作线程不在这个函数里启动)。首先是初始化一堆的配置参数,什么NameNode地址,socket参数等等。然后,向NameNode请求配置信息(DatanodeProtocol.versionRequest),并检查返回的NamespaceInfo和本地的版本是否一致。
正常情况的下一步是检查文件系统的状态并做必要的恢复,初始化FSDataset(到这个时候,上面图中storage和data成员变量已经初始化)。
然后,找一个端口并创建DataXceiverServer(run方法里启动),创建DataBlockScanner(根据需要在offerService中启动,只启动一次),创建DataNode上的HttpServer,启动ipcServer。这样就结束了DataNode相关的初始化工作。
在启动DataNode工作线程前,DataNode需要向NameNode注册。注册信息在初始化的时候已经构造完毕,包括DataXceiverServer端口,ipcServer端口,文件布局版本号等重要信息。注册成功后就可以启动DataNode线程。
DataNode的run方法,循环里有两种选择,升级(暂时不讨论)/正常工作。我们来看正常工作的offerService方法。offerService也是个循环,在循环里,offerService会定时向NameNode发送心跳,报告系统中Block状态的变化,报告DataNode现在管理的Block状态。发送心跳和Block状态报告时,NameNode会返回一些命令,DataNode将执行这些命令。
心跳的处理比较简单,以heartBeatInterval间隔发送。
Block状态变化报告,会利用保存在receivedBlockList和delHints两个列表中的信息。receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点。如在DataXceiver的replaceBlock中,有调用:
datanode.notifyNamenodeReceivedBlock(block, sourceID)
这表明,DataNode已经从sourceID上接收了一个Block,sourceID上对应的Block可以删除了(这个场景出现在当系统需要做负载均衡时,Block在DataNode之间拷贝)。
Block状态变化报告通过NameNode.blockReceived来报告。
Block状态报告也比较简单,以blockReportInterval间隔发送。
心跳和Block状态报告可以返回命令,这也是NameNode先DataNode发起请求的唯一方法。我们来看一下都有那些命令:
DNA_TRANSFER:拷贝数据块到其他DataNode
DNA_INVALIDATE:删除数据块(简单方法)
DNA_SHUTDOWN:关闭DataNode(简单方法)
DNA_REGISTER:DataNode重新注册(简单方法)
DNA_FINALIZE:提交升级(简单方法)
DNA_RECOVERBLOCK:恢复数据块
拷贝数据块到其他DataNode由transferBlocks方法执行。注意,返回的命令可以包含多个数据块,每一个数据块可以包含多个目标地址。transferBlocks方法将为每一个Block启动一个DataTransfer线程,用于传输数据。
DataTransfer是一个DataNode的内部类,它利用我们前面介绍的OP_WRITE_BLOCK写数据块操作,发送数据到多个目标上面。
恢复数据块和NameNode的租约(lease)恢复有关,我们后面再讨论。
更多推荐
所有评论(0)