一、Hadoop

Hadoop是一套大数据计数组件,三大核心组件:

        ① HDFS:Hadoop分布式文件系统。解决分布式系统的文件存储问题,本质是提供一套跨机器的文件管理服务。

        ② MapReduce:Hadoop的分布式运算程序编程框架

        ③ Yarn:Hadoop的分布式运算资源调度系统。解决分布式运算程序的启动、资源调度和资源回收。

Hadoop外围组件:

        ① Hive:基于HDFS和MapReduce的sql工具

        ② HBase:基于HDFS实现的分布式Nosql数据库

        ③ Flume:分布式日志采集系统

        ④ Sqoop:数据迁移工具(关系型数据库 <——>Hadoop存储系统)

1.1 MapReduce

核心思想:分而治之。通过一定划分方法将数据分成多个较小的(互不依赖的)具有同样计算过程的数据块,并非给不同节点去处理

② 特点:易于编程;具有良好拓展性;高容错性;适合海量数据离线处理

              (缺点:实时计算性能差,不能进行流式计算)

③ MapReduce处理过程

       Map阶段

        a. 设置InputFormat类,将读取的数据切分成键值对(k1-v1:k1每行数据相对于文件开头的偏移量;v1是每行数据)

        b. 自定义map逻辑,将k1-v1转换为k2-v2,输出结果。(继承Mapper类,重写Map方法)

        (Shuffle阶段)

        c. 分区:将数据标记好分区,发送到环形缓冲区(默认大小100Mb,达到80%阈值溢写)。对key取hash,对reduceTask个数取余,余几就放到哪个分区中

        d. 排序:对分区内的key按字典序排序(快排)

        (溢写:溢写文件默认每十个合成一个,用的归并排序merge)

        e. 局部聚合 Combiner:要求不能对最终数据结果有影响,可以sum,不可以avg

(在map阶段读取数据产生k2,v2之后,我们可以通过使用combiner规约来将map阶段的k2进行合并,v2生成集合,也就是从而减少生成文件的大小,减少reduce读取map阶段文件的网络传输。也就是本该在reduce阶段进行相同key合并,value形成新的集合的这个过程在map阶段通过使用规约提前完成了)

(map数量由块的数量决定,读几个block就有几个mapTask)

        Reduce阶段:

        (Shuffle阶段)

        a. 拉取数据:所有maptask执行完毕后,会启动一定数量的reduceTask主动拉取map端的数据

        b. 合并:复制的数据先存放环形缓冲区,缓冲区溢出则写入磁盘文件。拉去完所有的数据之后采用归并排序将内存和磁盘中的数据都进行一次排序。

        c. 分组:分组操作,将数据以组为单位发送到reduce方法里

         (Shuffle阶段结束)

        c. 编写Reduce函数实现k2-v2转换到k3-v3并输出

        d. 设置OutputFormat处理并爆出输出的key-value(k3-v3)数据

④ 优化:数据倾斜

        数据倾斜:处理数据时,大量数据呗分配到一个分区中,导致单个节点忙碌、其他节点空闲的问题。这背离了MapReduce并行计算的初中,降低了处理数据的效率。

        解决方法:

        a. 重新设计key:在Mapper中给key加上一个随机数,附带随机数的key不会呗大量分配到同一个节点,传到Reducer后去掉随机数即可。

        b. 使用Combiner(局部聚合):选择性地把大量key相同的数据先合并,再传递给Reduce阶段处理,降低Map阶段向Reduce发送地数据量。

        c. 增加Reduce任务数:增加处理数据的节点

        d. 增加虚拟机内存:提高运行效率

        e. 自定义分区:随机

Shuffle缺点:频繁设计数据再内存、磁盘间的多次往复

实现两个表的join

a. map join(适用于有一份数据较小的连接情况):小表直接放内存里,复制很多份,每个maptask内存里都放一个小表。只扫描大表,对大表中每一条记录的key-value都和小表做对比。

b. reduce join:在map阶段,map函数同时读取两个文件,为了区分数据来源,对每一条读到的数据都打标签(取不同key值),但没有对数据进行删减。在reduce段对两个list做乘积运算

MR中排序发生在哪几个阶段?

        map阶段快排;合并文件用归并排序;

        reduce:每个reduceTask会对收到的数据进行归并排序(按照key分成若干组)

       

2. HDFS

一种通过网络实现在多台主机上分布进行分布式存储的分布式文件系统

①  多种服务角色

        a. NameNode:负责管理元数据(维护文件系统的目录树,包括每个文件具体路径、块信息、物理存储位置、副本数量),并为客户端提供查询服务,是访问HDFS的唯一入口。

        b.  DataNode:负责管理文件块(存储块、读取块),是在单独机器上运行的软件。一般以机架形式组织。

        c. Secondary NameNode

Hadoop集群一般包含1个Name Node和大量DataNode

写数据流程

        a. 客户端通过RPC(远程过程调用)向Name Node发送写入文件的请求(参数:路径+名字,副本数量,切块规格)

        b. NameNode执行各种检查判断,返回一个输入流

        c. 客户端请求写入第一个block

        d. NameNode相应,给DataNode名(默认三个),给block_id

        e. 客户端向DataNode1发送请求(带block_id,其他DataNode编号).DataNode1向DataNode2发送请求,DataNode2向DataNode3发送请求,建立PipeLine(要反向进行ack校验)

        f. DataNode响应请求。Client向DataNode1发送数据,DataNode1向2发送数据,2向3发。

        g. 发送完毕,关闭PipeLine(若还有块要写入,则重复c-g)

        h. 向Name Node汇报完成写入,Name Node记录元数据 

(Pipeline充分利用每个机器的带宽,避免网络瓶颈和高延迟连接,最小化推送所有数据的延时)

读数据流程

        a. 客户端向Name Node发送读数据请求

        b. NameNode给客户端发送元数据

        c. 客户端从最近的存数据的DataNode开始请求读取数据,响应后读取

        d. 重复c

副本放置策略(默认三个副本)

        副本a存放在客户端所在节点上(若Client不在集群范围,则随机)

        副本b存放在与a不同机架的一个节点上

        副本c和b在同一个机架,随机放在不同节点 (若有更多副本,则随机放置)

元数据管理机制

        NameNode中两个重要文件(a)fsimage镜像文件:NameNode启动时对整个文件系统的快照(备份)(b)edits日志文件:存储上次fsimage保存至下次fsimage保存之间的所有HDFS操作

        SecondaryNameNode每隔一小时进行一次镜像备份,对edits和fsimage文件进行合并

        NameNode不持久化存储每个文件中各个块送在的DataNode的位置信息,这些信息会在系统启动时从DataNode重建

3. Hive

是基于Hadoop的数仓工具,提供了操作简单的sql查询语言hql。hql语句可转换为MapReduce任务执行。Hive处理的真实数据存储在Hadoop文件系统中,即HDFS

基础

Hive特点:可伸缩、可扩展、能容错,输入格式松散、耦合;简单,避免写mr程序

与关系型数据库sql的不同:hql支持很多语句DDL,DML,常见聚合函数,join查询,条件查询;还提供数据提取、转化、加载功能;提供udf一进一出、udaf多进一处、udtf一进多出,实现对mapper和reducer函数的定制。

缺点:a. 延迟高,不适合低延时的online的联机事务查询OLTP,也不提供实时查询功能,最适合于对大量不可变数据进行批处理。

b. hql表达能力有限:迭代式算法无法表达(循环、迭代、子查询);数据挖掘不擅长

c. hive 效率低:底层实现的物理mr或别的程序,是自动生成的,通常不够智能化;调优比较困难,粒度比较粗

使用场景:将用户的hql通过解释器转化到mapreduce,提交到hadoop集群,hadoop监控作业的执行过程,返回作业执行结果给用户。是为非实时大数据批处理设计的

架构

meta Store是hive自带的元数据存储系统(元数据包括表名、表所属的数据库、表拥有者、列分区字段、表的类型,数据所在目录等),但一般将元数据存到mysql里,因为mysql支持多用户并发访问。Diver结合元数据将sql指令翻译成mapreduce. 

hive有几种模式,执行引擎可以是mapreduce、spark、tez

a. sql解析器:将sql字符串转化成抽象的语法树ast(这一步一般用第三方工具库完成),再对ast进行语法分析(比如表是否存在,字段是否存在,sql语句是否有误等等)

b. 编译器:将ast编译生成逻辑的执行计划

c. 优化器:对逻辑执行计划进行优化

d. 执行器:把优化好的执行计划转换成可以运行的物理计划,提交到计算程序

③  内部表 vs.外部表

       在使用hive搭数据仓库的时候,可以建立两种表格。一种是内部表(managed table),一种是外部表(external table)。

a. 创建

        创建内部表 create table:会将数据移动到数据仓库指向的路径,由hive自身管理;

        创建外部表 create table external:仅记录数据所在的路径,不会对数据的位置做任何改变,由hdfs管理,外部表数据存储位置由用户自己决定。

b. 删除

        内部表:删除元数据(meta data)和存储数据。

        外部表:仅删除元数据(meta data),不删除原始数据。更安全一点,数据的组织更灵活,方便共享原始数据

        应用:外部表:比如某个公司的原始日志数据存放在一个目录中,多个部门对这些原始数据进行分析,那么创建外部表是明智选择,这样原始数据不会被删除;内部表:对原始数据或比较重要的中间数据进行建表存储        

c. 修改

        内部表:直接同步到元数据

        外部表:对表结构和分区进行修改,需要修复这张表(?)

sql转化成mr的过程

a. 编译器定义sql语法规则,完成sql词法语法的解析,将sql转化为抽象语法树ast tree

b. 遍历抽象语法树,抽象出查询的基本组成单位查询块(query block)

c. 遍历query block,将遍历结果翻译为执行操作树

d. 优化器在逻辑层对操作树进行优化,合并不必要的ReduceSink操作,以减少shuffle数据量

e. 遍历优化好的操作树,将其翻译成MapReduce任务(物理的MR)

f. 物理层的优化器再进行优化,生成最终的执行计划

g. 将最终执行计划生成物理mr,提交到hadoop集群进行计算,再将结果返回

自定义函数(内置函数满足不了需求,用户可以自己定义)

a. UDF(一进一出):实际应用举例:用自定义函数加密;放进去一个字符串,返回一个字段。实现步骤:继承UDF类,重写evaluate方法

b. UDAF(多进一出):用户自定义聚合函数(类似count,max,min)

c. UDTF(一进多出):用户自定表生成的函数(类似于行转列)。实现:继承GenerricUDTF,重写3个方法(初始化;处理方法;close)

为什么用自定义函数:可以打印日志,方便做调试

⑥  数据倾斜

数据倾斜原因:shuffle阶段一个key对应很多值,会将这些值分到一个分区。两个表做join时,maptask中一个任务处理的时间明显大于其他task,出现了数据倾斜的问题

解决方法:开启数据倾斜的负载均衡,设置参数 set.hive.groupby.skewindata = true(思想:先随机分发处理,再按照key, group by 分发处理)。使计算变成两个mapreduce

        a. 第一个join:map的输出结果集合会随机分布到reduce中,每个reduce做部分的聚合操作并输出结果(相同group by key可能被分发到不同reduce中,把数据打散,达到负载均衡目的)

        b. 第二个join:根据上一步结果按照group bu key分到reduce当中,保证相同key分到一起,做最终的聚合操作

hive解决数据倾斜问题_hive数据倾斜的解决办法_chenbtravel的博客-CSDN博客

hive解决数据倾斜问题_hive数据倾斜的解决办法_chenbtravel的博客-CSDN博客

小文件合并

小文件存在的问题:a. 元数据层面:每个小文件都有一个元数据,小文件过多会占用namenode服务器大量内存,影响性能; b.计算层面:在进行分布式计算时,会从hsdf拉数据,默认会为每个小文件启动一个maptask,影响计算性能,同时大量小文件也会影响磁盘的寻址时间

合并方法:a. 在map执行前合并,减少map数量:set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

b. 在map -only任务结束后合并(参数默认使ture): set hive.merge.mapfiles = true

c. 在mapreduce结束合并(参数默认false):set hive.merge.mapredfiles = true

d. 设置合并文件大小默认256M set hive.merge.size.per.task = 268435456

e. 当输出文件平均大小小于设置值时,会单独启动一个mr程序进行合并:set hive.merge.smallfiles.avgsize = 16777216

严格模式

hive提供的严格模式可以防止用户执行那些可能意想不到的会产生不好影响的查询,可通过参数设置(默认是非严格模型)

严格模式禁止三种查询:a. 对于分区表,除非where语句中含有分区字段来限制范围,否则不执行(不允许用户直接扫描所有分区,不能进行没有分区限制的查询,防止资源消耗过大)

b. 要求使用order by 的语句必须使用limit语句(order by会把所有结果数据分发到一个reducer处理,防止reducer额外执行很长一段时间)

c. 限制笛卡尔积(在join时不使用on而使用where,关系型数据库会进行优化,将where条件转化成on,但hive不会)

分区及优势

分区:对表进行粗略划分的机制,可以实现查询速率的提高。分区时会在表目录下,创建一个相应的子目录

        a. 静态分区:创建时手动指定的分区方式

        b. 动态分区:系统对数据进行判断来指定分区

分区字段是表外字段;不建议使用中文;不建议分区数太多;不建议动态分区

常用

数学函数:round,sqrt(取平方根)

集合函数:size(获取map/数组长度),map_keys(获取map数组所有key值)

类型转换函数:cast

日期函数:year,day

条件函数:where

字符串函数:length,concat

聚合函数:count,sum, avg

⑾ 表数据的加载与导出

加载:insert into 直接向分区加载

        通过load函数加载:参数local代表加载的文件数据从本地找,不加该参数则在hdfs上找文件加载;override参数会把之前的东西覆盖掉,不加则不会覆盖

        多插入模式:一个sql语句可以有多个insert into

        创建查询插入:建表时直接通过select查询表的数据加载到创建的表(无需指定字段)

        location:通过读取文件插入,但创建表时需要指定字段,多个字段用分隔符

导出:查询导出;hadoop命令导出;hive shell命令导出;export导出

数据存储

数据存储格式:textfile;sequencefile;rcfile;ORC;parquet

textfile,sequencefile,rcfile都是基于行进存储;ORC,parquet式存储;

a. textfile:默认存储格式,普通文本文件;数据不会被压缩,磁盘开销大

b. sequencefile:二进制格式;支持压缩和分隔,写和查询都比较快;不允许使用load方式加载,可以使用insert加载;生成的文件不能通过cat命令查看

c. rcfile:可以进行行列混合压缩,比如将附近的行和列数据尽量保存到同一个块中;提高查询效率,写入慢;可以配合压缩算法;不能用load方式加载,可以用insert加载

d. ORC:基于rc进行优化的存储格式;条形存储;查和写都比较高效

e. parquet:二进制存储文件;大型查询效率比较高;支持压缩

Hive中4个By Sort By 、Order By、Distrbute By、 Cluster By区别

        Sort By:分区内有序;

        Order By:全局排序,只有一个Reducer;

        Distrbute By:类似MR中Partition,进行分区,结合sort by使用。

        Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC

4. Yarn集群 

是通用的资源管理系统和调度平台。MapReduce运行在Yarn集群上

① Yarn三大组件:a. (集群物理层面)ResourceManager:负责整个计算任务的调度和分配

                              b. (集群物理层面)NodeManager:负责执行具体运算

                              c. (App层面)Appmstr

② 流程:a. 客户端提交计算任务,ResourceManager接收,并启动组件

                b. 启动AppMstr,AppMstr向ResourceManager申请资源

                c. ResourceManager回复资源列表给AppMstr

                d. 要求NodeManager分配资源

                e. NodeManager启动MapTask执行具体计算任务(ReduceTask)

                f. NodeManager将状态和计算结果汇报给AppMstr

                g. AppMstr汇报结果给ResourceManager

③ 调度器:a. FIFO先进先出调度器

                   b. Capacity Scheduler 容量调度器(默认):以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限

                   c. Fair 公平调度器

 二、 Spark

是一种基于内存的快速、通用,可扩展的大数据分析计算引擎(本质是批计算)

1. 特点:运行速度快;易用性好;通用性强;随处运行

2.Spark vs. Hadoop

        根本差异:多个作业之间的数据通信问题。Spark基于内存,Hadoop基于磁盘。

速度a. Spark迭代过程数据存在内存;Hadoop存在磁盘,随后的计算需要从磁盘读结果。而从内存读数据比从磁盘读时间低两个数量级。 b. 复杂的数据计算任务需要多个步骤实现,对于步骤间的依赖性,Hadoop需要借助其他工具处理。Spark在执行任务前,将步骤根据依赖关系形成DAG图(有向无环图),任务执行按图索骥,可以优化I/O操作。

Spark容错性高:引进了弹性分布式数据集,如果数据集丢失,可以根据“血统”重建

Spark更通用:Hadoop只提供Map和Reduce,Spark提供多种操作(可分为两类:转换操作/行动操作)

三、 Flink

是一个分布式大数据处理引擎,可对有界数据流(批计算)无界数据流(流计算)进行有状态计算。可部署在各种集群环境,对各种大小的数据规模进行快速计算。

特点:高吞吐,低延迟,高性能;支持事件时间(Event Time);支持有状态计算;支持高度灵活窗口操作;基于轻量级分布式快照(CheckPoint)实现的容错;基于JVM实现的独立的内存管理;Save Point保存点

四、Kafka

是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

1. 消息队列的好处:

        ① 解耦: 允许独立扩展和修改两边的处理过程,只要接口约束相同

        ② 可恢复性: 系统一部分组件失效时不会影响整个系统,降低耦合度

        ③ 缓冲:控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

        ④ 灵活性 & 峰值处理能力

        ⑤ 异步通信

2. 消息队列两种模式: ① 发布/订阅模式(消息可传给多个消费者使用。基于拉取的,消费者的消费速度可以自己决定);② 点对点模式:

3.  缺点:要维持长轮询,消费者要一直拉取状态

4.  ack机制 Kafka的ack机制_脚踏实地,仰望星空的博客-CSDN博客_kafka的ack机制

kafka解决了什么问题? - 知乎 (zhihu.com)

Kafka高可用,高吞吐量低延迟的高并发的特性背后实现机制 (baidu.com)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐