超全笔记:HDFS,MAPREDUCE,YARN,HIVE
NameNode,DataNode,SecondaryNameNode角色NameNode:基于内存存储文件元数据(metadata)、block映射元数据:/path/path/xxx.txt 128M root:root 2021.10… rwxrwxrw映射(比如xxx.ttt): xxx_bk1 node1 node3,xxx_bk2 node1 node2DataNode:基于本地磁盘存
本文主要记录个人学习笔记,引用了大量图片,侵权联系删
Hadoop
狭义Hadoop:指的是HDFS,YARN,MAPREDUCE三大组件,值得注意的是2.x版本将原本负责资源管理和数据处理拆分位YARN 和 MR 两个组件,这样更加灵活,可以在2.0版本使用其他的数据处理组件比如Spark
广义Hadoop:指的是hadoop一系列的生态圈
1. HDFS架构
Hadoop Distributed File System (HDFS)是一个分布式文件系统,与普通文件系统不同的是,HDFS的文件会被分为很多的Block分散存储在不同机器上,高容量,高吞吐,高容错。
1.1 NameNode与DataNode
NameNode是HDFS的核心,架构中的主角色,Client访问入口,维护和管理文件系统元数据(包括名称空间目录树结构、文件和块的位置信息、访问权限等信息)
注意点
- NameNode不持久化存储块的位置信息,这些信息会在系统启动时由DataNode汇报重建保存在内存中,并且如果有某些块的副本数不足,不让节点复制出对应的块
- NameNode需要机器有大内存支持
NameNode是基于内存和磁盘的,因为要速度快,所以会将元数据放于内存中,内存断点易失,所以需要持久化策略,采用基于快照叠加增量日志;其中磁盘上元数据文件包括 FileSystemImage和 Edits log(Journal),对元数据产生增删重命名会先往日志写
内存持久化磁盘,这是一个IO操作,为了克服这个问题,减小edit logs文件大小和得到最新的fsimage文件,减小namenode压力
checkpoint: 节点周期EditLog向FileSystemImage合并生成新的FileSystemImage
DataNode:HDFS的文件会被分为很多的Block基于本地磁盘存储
在一系列的DataNode中;保存block块的校验;与NameNode维持心跳汇报block列表状态;执行由NameNode下达的创建删除复制的Block操作
注意点:
- DataNode所在机器需要大磁盘支持
1.2 Block读写
- HDFS客户端创建文件对象实例
DistributedFileSystem
, 该对象中封装了与HDFS文件系统操作的相关方法。
文件系统的基类,比如本地,比如分布式文件DistributedFileSystem 封装了与分布式文件系统交互的方法
- 调用
DistributedFileSystem
对象的create()
方法,通过RPC请求NameNode
创建文件。NameNode
执行各种检查判断:目标文件是否存在、父目录是否存在、客户端是否具有创建该文件的权限。检查通过,NameNode
就会为本次请求记下一条记录,返回FSDataOutputStream
输出流对象给客户端用于写数据 - 客户端通过
FSDataOutputStream
输出流开始写入数据。 - 客户端写入数据时,将数据分成一个个数据包(
packet 默认64k
), 内部组件DataStreamer
请求NameNode
挑选出适合存储数据副本的一组DataNode
地址,默认是3副本存储。DataStreamer
将数据包流式传输到pipeline
的第一个DataNode,该DataNode存储数据包并将它发送到pipeline的第二个DataNode。同样,第二个DataNode存储数据包并且发送给第三个(也是最后一个)DataNode。 - 传输的反方向上,会通过ACK机制校验数据包传输是否成功;
- 客户端完成数据写入后,在
FSDataOutputStream
输出流上调用close()方法关闭。 DistributedFileSystem
联系NameNode
告知其文件写入完成,等待NameNode
确认。
注意点:
- datanode之间采用pipeline线性传输,而不是拓扑式传输,这样的好处在于避免了网络瓶颈和高延迟的连接,利用了每个机器的带宽
- block分package传输,
- 为了pipeline传输的可靠性在用了pipeline上ack应答:pipeline反方向ack校验
- NameNode怎么才算确认成功呢?因为namenode已经知道文件由哪些块组成(DataStream请求分配数据块),因此仅需等待最小复制块即可成功返回。最小复制是由参数dfs.namenode.replication.min指定,默认是1.
- 副本
- HDFS客户端创建对象实例
DistributedFileSystem
, 调用该对象的open()
方法来打开希望读取的文件。 DistributedFileSystem
使用RPC调用namenode
来确定文件中前几个块的块位置(分批次读取
)信息。对于每个块,namenode
返回具有该块所有副本的datanode
位置地址列表,并且该地址列表是排序好的,与客户端的网络拓扑距离近的排序靠前DistributedFileSystem
将FSDataInputStream
输入流返回到客户端以供其读取数据。- 客户端在
FSDataInputStream
输入流上调用read()
方法。然后,已存储DataNode
地址的InputStream
连接到文件中第一个块的最近的DataNode
。数据从DataNode
流回客户端,结果客户端可以在流上重复调用read()
- 当该块结束时,FSDataInputStream将关闭与DataNode的连接,然后寻找下一个block块的最佳datanode位置。这些操作对用户来说是透明的。所以用户感觉起来它一直在读取一个连续的流。客户端从流中读取数据时,也会根据需要询问NameNode来
检索下一批数据块
的DataNode位置信息。 - 一旦客户端完成读取,就对FSDataInputStream调用close()方法
1.3 HDFS高可用
datanode不会出现某个datanode故障,保证只要不是整个集群不可用:冗余副本机制,EC编码逆向解码
namenode会出现单点故障:非常糟糕,最底层的,会导致上层很多基于hdfs都故障 1.0单个,2.0单主单备,3.0单主多备
QJM:仲裁日志管理器,HA方案之一用于解决 主备切换和脑裂问题
FC:zk的容错控制器,可以主备切换,避免脑裂。每个NN都有一个ZKFC,这是一个ZK客户端,不仅可以通过cmd监控NN,还可以监控所在机器的健康状态。借助了ZK锁,两个ZKFC来抢节点,且临时目录,创建的怪掉就会移除整个临时目录,还有个监听功能,一旦某个节点挂掉,通知另外的来抢
解决主备切换
- 两个zkfc去zk集群上注册节点(短暂的znode),谁注册成功,谁代表机器上的nn就是active,没有注册成功的对于znode进行监听,监听是否消息,返回standby。
- zkfc发现这个有问题了,就会断开和zk的连接,session会话消失,其注册的znode被删除消失,触发监听,把事件通知给standby那台机器上的zkfc,揭竿而起,准备上位。
- standby同样抢锁,抢到就变成active。
解决脑裂问题
如果FC错报还是会出现脑裂,Fencing的隔离机制,sshfence和shellfence
- sshfence:远程补刀,通过远程登陆的方式给对方干掉(fuser,tcp根据端口号定位)
解决数据同步
JN 轻量级分布式系统,读写速度几块,且与zk一样的分布式一致算法
主备需要节点间数据,以保证切换可以使得集群正常运行,所以主备节点都与一组称为"JournalNodes"(JN) 的单独守护程序进行通信,当 Active 节点执行任何命名空间修改时,它会持久地将Edit log记录到大多数这些 JN。Standby节点能够从 JN 读取Edit log,并不断监视它们对Edit log的更改,发生修改,它会将它们应用于自己的命名空间。在发生故障转移时,Standby 将确保在将自身提升为 active状态之前,它会确保 JournalNodes 读取了所有编辑内容。这可确保在发生故障转移之前,命名空间状态已完全同步;Standby节点还必须具有集群中块位置的最新信息,DataNode配置了所有NameNode的位置,并向所有NameNode发送块位置信息和检测信号。
必须至少有 3 个 JournalNode 守护程序,因为Edit log修改必须写入大多数 JN。这将允许系统容忍单个计算机的故障。您也可以运行 3 个以上的 JournalNode,但为了实际增加系统可以容忍的故障数,您应该运行奇数个 JN(即 3、5、7 等)。请注意,当使用 N 个JN运行时,系统最多可以容忍 (N - 1) / 2 次故障并继续正常运行。
JN:实现数据同步
1.4 联邦
- datanode可以不断增加节点,但是namenode无可能无限加内存,如何总线扩展机器加内存
2. MapReduce
MapReduce 易于编程,良好扩展,适合海量数据离线计算,但实时计算差,不能流式计算
分而治之的思想,分为两个阶段 map ,reduce
map: 对一组数据元素进行某种重复式的处理 | reduce: 对Map的中间结果进行某种进一步的结果整理 |
---|---|
2.1 MAP
- 第一阶段:把输入目录下文件按照一定的标准逐个进行
逻辑切片
,对input split 中的数据按照一定的规则读取解析返回<key,value>
对。
逻辑切片:即将一批数据分成几份处理,默认Split size = Block size(128M),得到多个input split,
每一个切片由一个MapTask处理
规则读取:默认是按行读取数据
。key是每一行的起始位置偏移量,value是本行的文本内容。(TextInputFormat)
-
第二阶段:调用Mapper类中的map方法处理数据。
每读取解析出来的一个<key,value>
,调用一次map方法 -
第三阶段:Map输出的中间结果会先放在内存缓冲区中,按照一定的规则对Map输出的键值对进行分区partition。
默认不分区,因为只有一个reducetask。
分区的数量就是reducetask运行的数量
。
- 第四阶段:Map输出数据写入内存缓冲区,达到比例溢出到磁盘上。溢出spill的时候根据key进行排序sort。
默认根据key字典序排序。
- 第五阶段:对所有溢出文件进行最终的merge合并,成为一个文件
2.2 Reduce
- 第一阶段:ReduceTask会
主动从MapTask复制拉取
属于需要自己分区处理的数据。 - 第二阶段:把拉取来数据,
全部进行合并merge
,即把分散的数据合并成一个大的数据。再对合并后的数据排序 - 第三阶段:是对
排序后的键值对调用reduce方法
。键相等的键值对调用一次reduce方法。最后把这些输出的键值对写入到HDFS文件中
2.3 shuffle
Map端Shuffle [收集-溢写-合并]
- Collect阶段:将MapTask的结果收集输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,
默认Hash分区。
- Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数
据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
- Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件
Reduce端shuffle [拉去-合并-排序]
- Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据。
- Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作
- Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
Shuffle弊端
- Shuffle是MapReduce程序的核心与精髓,是MapReduce的灵魂所在。
- Shuffle也是MapReduce被诟病最多的地方所在。MapReduce相比较于Spark、Flink计算引擎慢的原因,跟Shuffle机制有很大的关系。
- Shuffle中频繁涉及到数据在内存、磁盘之间的多次往复。
注意:
- 一个MR只能包含一个,不能有多个连续的mapper,连续多个reduce
- MR程序,数据都是以k:v形式流转。所以在每个阶段我们要考虑清楚输入输出的kv是什么
- MapReduce计算引擎天生的弊端(慢),使用率小,但是某些软件的背后还依赖MapReduce引擎
3. YARN
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的Hadoop资源管理器,YARN是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度
- 资源管理系统:集群的硬件资源,和程序运行相关,比如内存、CPU等。
- 调度平台:多个程序同时申请计算资源如何分配,调度的规则(算法)。
- 通用:不仅仅支持MapReduce程序,理论上支持各种计算程序。YARN不关心你干什么,只关心你要资源,在有的情况下给你,用完之后还我,正是因为YARN的包容,使得其他计算框架能专注于计算性能的提升
3.1 YARN 架构
YARN角色
物理资源管理角色:
ResourceManager:YARN集群中的主角色
,决定系统中所有应用程序之间资源分配的最终权限,即最终仲裁者,有如下核心组件
- Resource Scheduler:负责根据application的要求
分配资源
- ApplicationsManager:
所有application的总负责人
,负责接受client端的application
NodeManager:YARN中的从角色
,一台机器上一个,负责管理本机器上的计算资源。负责监容器资源(cpu,memory,disk,network)的使用,并报告给Scheduler
- MRAppMaster:
每个app的负责人
,用户提交的每个应用程序均包含一个AM - MapTask ,Ruduce Task:会根据分配,分配到对应的Node上
资源管理独立出来
App Master 没有资源管理,不是长启动任务调度
3.1 YARN流程
MR程序有三类进程:
- MRAppMaster:负责整个MR程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
- 第1步:用户通过客户端向YARN中ResourceManager提交应用程序(比如hadoop jar提交MR程序);
- 第2步:ResourceManager为该应用程序分配第一个Container(容器),并与对应的NodeManager通信,要求它在这个Container中启动这个应用程序的MRAppMaster
- 第3步:MRAppMaster启动成功之后,首先向ResourceManager注册并保持通信,这样用户可以直接通过ResourceManage查看应用程序的运行状态(处理了百分之几);
- 第4步:MRAppMaster为本次程序内部的各个Task任务向RM申请资源,并监控它的运行状态;
- 第5步:一旦 MRAppMaster申请到资源后,便与对应的 NodeManager 通信,要求它启动任务。
- 第6步:NodeManager 为任务设置好运行环境后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务
- 第7步:各个任务通过某个 RPC 协议向 MRAppMaster汇报自己的状态和进度,以让 MRAppMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过
RPC 向 MRAppMaster查询应用程序的当前运行状态。 - 第8步:应用程序运行完成后,MRAppMaster向 ResourceManager 注销并关闭自己
还有另一个
原文链接
第 1 步:Client执行main()函数中run job(),开启作业
通过submit或者waitForCompletion提交作业,waitForCompletion()方法通过每秒循环轮转作业进度,如果发现与上次报告有改变,则将进度报告发送到控制台。其实waitForComplection()方法中还是调用submit()方法。
第 2 步:client向RM发送作业请求同时RM将作业id(在YARN中叫做应用程序ID)以及jar包存放路径返回给Client。
客户端向ResourceManager提交请求GetNewApplicationRequest,ResourceManager为其返回应答GetNewApplicationResponse,该数据结构中包含多种信息,包括ApplicationId、可资源使用上限和下限等。
第 3 步:这时候作业客户端检查输出说明、计算输入分片(可以通过yarn.app.mapreduce.am.computer-splits-in-cluster在集群上产生分片)。Client会把Jar路径为前缀作业id为后缀作为唯一存放路径,并将作业信息(jar、配置文件、分片信息)写入到HDFS集群中,默认情况下jar包写10份,而其他数据只写3份,当该程序运行完后删除这些数据
第 4 步:客户端Client将启动ApplicationMaster所需的所有信息(包括描述更为详细的Jar存放地址)打包到数据结构ApplicationSubmissionContext中,然后调用submitApplication(ApplicationSubmissionContext)将ApplicationMaster提交到ResourceManager上。
第 5 步(5a-5b):资源管理器RM在收到submitApplication()消息后,将其放入调度器(Scheduler),调度器为其分配一个容器Container,向NM发送命令,然后NM在RM的管理下在container中启动MRAPPMaster进程
ApplicationMaster首先需向ResourceManager发送注册请求RegisterApplicationMasterRequest,而ResourceManager将返回RegisterApplicationMasterResponse。
第 6 步:applicationmaster对作业进行初始化,创建过个薄记对象以跟踪作业进度。MR根据HDFS中jar包数据量为NM分配任务.
第 7 步:**applicationmaster接受来自HDFS在客户端计算的输入分片,对每一个分片创建一个map任务,任务对象,由mapreduce.job.reduces属性设置reduce个数。
uber模式
当任务小的时候就会启动一个JVM运行MapReduce作业,这在MapReduce1中是不允许的,这样的作业在YARN中成为uber作业,通过设置mapreduce.job.ubertask.enable设置为false使用。
那什么是小任务呢?
当小于10个mapper且只有1个reducer且输入大小小于一个HDFS块的任务。
但是这三个值可以重新设定:mapreduce.job.ubertask.maxmaps
第 8 步:如果作业不适合uber任务运行,applicationmaster就会为所有的map任务和reduce任务向资源管理器申请容器(Container,仅包含内存和cpu两类资源)。
ApplicationMaster使用ResourceRequest类描述每个Container。一旦为任务构造了Container后,ApplicationMaster会向ResourceManager发送一个AllocateRequest对象,以请求分配这些Container。ResourceManager会为ApplicationMaster返回一个AllocateResponse对象。
请求作业为任务指定内存需求,map任务和reduce任务的默认都会申请1024MB的内存,这个值可以通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置。
这里的内存分配策略和mapreduce1中不同,在MR1中tasktracker中有固定数量的槽,每个任务运行在一个槽(slot)中,槽有最大内存分配限制,这样集群是固定的,当任务使用较少内存时,无法充分使用槽的内存,造成其他任务不能够获取足够内存因而导致作业失败。
在YARN中,资源分为更细的粒度,所以避免了以上的问题。应用程序可以申请最小到最大内存限制的任意最小值的倍数的内存容量。默认值是1024~10240,可以通过yarn.scheduler.capacity.minimum-allocation-mb和yarn.scheduler.capacity.maximum-allocation.mb设定。任务可以通过设置mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来请求1GB到10GB的任意1GB的整数倍的内存容量。
第 9 步(9a~9b):资源管理器为任务分配了容器,当ApplicationMaster(从ResourceManager端)收到新分配的Container列表后,会向对应的NodeManager发送ContainerLaunchContext以启动Container。NM会开启内部YARNChild,由YarnChild的java应用程序执行。
第 10 步:运行任务之前,首先将资源本地化,包括作业配置、jar文件和所有来自分布式缓存的文件。YARNChild根据命令到HDFS检索作业资源。
第 11 步:YARNChild开启MapTask 或者Reduce Task
3.2 调度策略
FIFO | Capactiy | Fair |
---|---|---|
Fair schedule | ||
易容配置,不适合有优先顺序的任 | 多组织共享集群资源,每个组织分配专门的队列。,但容易产生浪费 | 随着时间的流逝任务获取公平的资源 |
hive 下载地址 https://dlcdn.apache.org/hive/hive-2.3.9/
hive 文档: https://cwiki.apache.org/confluence/display/Hive/AdminManual
4. HIVE
4.1 数仓
数据仓库(英语:Data Warehouse,简称数仓、DW),是一个用于存储、分析、报告的数据系统,是一个集成化的专业的数据分析平台,方便决策
数仓本身并不产生数据
,数据来自于外部系统,同时不消费数据
,结果给其他外部应用使用。所以才叫仓库而不是工厂
联机事务处理系统(OLTP)正好可以满足上述业务需求开展, 其主要任务是执行联机事务处理。其基本特征是前台接收的用户数据可以立即传送到后台进行处理,并在很短的时间内给出处理结果,关系型数据库
(RDBMS)是OLTP典型应用
为什么不在数据库里分析?数据分析也是对数据进行读取操作,会让读取压力倍增,OLTP仅存储数周或数月的数据,数据分散在不同系统不同表中,字段类型属性不统一
数仓特性:
- 面向主题:确定分析对象
- 集成性:各个数据源需要集成到数仓,并且各个系统对某一主题内部的命名与格式不同,所以不同数据源到数仓时需要去除这些不一致,经过ETL(抽取,转换,加载)的操作
- 非易变性:只分析,很少不修改内容
- 时变性:离线分析往往分析过去的数据,数据会与时间有关,所以就有些T+1报表(第二天分析前一天)
4.2 HIVE
Hive的存储用的HDFS,计算用的MR,本身并没有干什么,主要提供了一种SQL来进行数据分析
的能力,Hive核心是将HQL转换为MapReduce程序,然后将程序提交到Hadoop群集执行
HIVE可以将存储在Hadoop文件中的结构化、半结构化
数据文件映射
为一张数据库表,基于表
提供了一种类似SQL的查询模型,称为Hive查询语言(HQL),用于访问和分析存储在Hadoop文件中的大型数据集
HIVE也有局限性,结构化、半结构化
的要求,无结构的图片,视频很难做映射。这种映射是需要存储
,存储进去的往往就是一些元数据信息
4.3 HIVE 架构
发请求给Driver源数据检查是否有元数据是否存在,不存在直接返回,有就编译优化生成jar,driver提jar给hadoop执行
-
CLI(Command Line Interface):用户可以使用Hive自带的命令行接口执行Hive QL、设置参数等功能
-
JDBC/ODBC:用户可以使用JDBC或者ODBC的方式在代码中操作Hive
-
Web GUI:浏览器接口,用户可以在浏览器中对Hive进行操作(2.2之后淘汰)
-
Thrift : Thrift服务运行客户端使用Java、C++、Ruby等多种语言,通过编程的方式远程访问Hive
-
Driver:Hive Driver是Hive的核心,其中包含解释器、编译器、优化器等各个组件,完成从SQL语句到MapReduce任务的解析优化执行过程
- 解释器:调用语法解释器和语义分析器将SQL语句转换成对应的可执行的java代码或者业务代码
- 编译器:将对应的java代码转换成字节码文件或者jar包
- 优化器:从SQL语句到java代码的解析转化过程中需要调用优化器,进行相关策略的优化,实现最优的 查询性能
- 执行器:当业务代码转换完成之后,需要上传到MapReduce的集群中执行
- metastore
Hive的元数据存储服务,一般将数据存储在关系型数据库中,为了实现Hive元数据的持久化操作,Hive的安装包中自带了Derby内存数据库,但是在实际的生产环境中一般使用mysql来存储元数据
why:Derby 内存 元数据不可以持久化了,意外这每次重启都会有数据加载的过程;MYSQL 元数据可以持久化了
官方提供了两种方式实现元数据管理中心
4.4 HIVE 可用客户端
一般使用DataGrip和DBeaver这种可视化来方便开发
4.5 hive表
hive DDL
4.6 hive location
4.7 hive分区
且位置有要求
4.8 hive格式
4.8 hive压缩
不能只看时间,有可能没有压缩太小,跟没有压缩一样
SPARK
更多推荐
所有评论(0)