1、5种存储格式

Apache Hive支持Apache Hadoop中使用的几种熟悉的文件格式,如TextFile,RCFile,SequenceFile,AVRO,ORC和Parquet格式。

Cloudera Impala也支持这些文件格式。

在建表时使用STORED AS (TextFile|RCFile|SequenceFile|AVRO|ORC|Parquet)来指定存储格式

TextFile每一行都是一条记录,每行都以换行符(\ n)结尾。数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。 

SequenceFile是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。支持三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。 

RCFile是一种行列存储相结合的存储方式。首先,其将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。其次,块数据列式存储,有利于数据压缩和快速的列存取。 

AVRO是开源项目,为Hadoop提供数据序列化和数据交换服务。您可以在Hadoop生态系统和以任何编程语言编写的程序之间交换数据。Avro是基于大数据Hadoop的应用程序中流行的文件格式之一。 

ORC文件代表了优化排柱状的文件格式。ORC文件格式提供了一种将数据存储在Hive表中的高效方法。这个文件系统实际上是为了克服其他Hive文件格式的限制而设计的。Hive从大型表读取,写入和处理数据时,使用ORC文件可以提高性能。

Parquet是一个面向列的二进制文件格式。Parquet对于大型查询的类型是高效的。对于扫描特定表格中的特定列的查询,Parquet特别有用。Parquet桌子使用压缩Snappy,gzip;目前Snappy默认。

2、列式存储和行式存储

1.行存储的特点

查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。

Hadoop block中的基于行存储的示例图

优点是:具备快速数据加载动态负载的高适应能力,因为行存储保证了相同记录的所有域都在同一个集群节点
缺点是:但是它不太满足快速的查询响应时间的要求,特别是在当查询仅仅针对所有列中的少数几列时,它就不能直接定位到所需列而跳过不需要的列,由于混合着不同数据值的列,行存储不易获得一个极高的压缩比。

Hadoop block中的基于行存储的示例图

优点是:这种结构使得在查询时能够直接读取需要的列而避免不必要列的读取,并且对于相似数据也可以有一个更好的压缩比。
缺点是:它并不能提供基于Hadoop系统的快速查询处理,也不能保证同一记录的所有列都存储在同一集群节点之上,也不适应高度动态的数据负载模式。 

2.列存储的特点

因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。

TEXTFILE和SEQUENCEFILE的存储格式都是基于行存储的。

ORC和PARQUET是基于列式存储的。

RCFile结合列存储和行存储的优缺点,混合存储

3、TextFile

默认格式,数据不做压缩,磁盘开销大,数据解析开销大。
可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。
示例:

create table if not exists textfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as textfile;

插入数据操作:

set hive.exec.compress.output=true;  
set mapred.output.compress=true;  
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;  
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;  
insert overwrite table textfile_table select * from textfile_table;  

4、SequenceFile

SequenceFile是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。
SequenceFile支持三种压缩选择:none,record,block。Record压缩率低,一般建议使用BLOCK压缩。
示例:

create table if not exists seqfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as sequencefile;

插入数据操作:

set hive.exec.compress.output=true;  
set mapred.output.compress=true;  
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;  
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;  
set mapred.output.compression.type=BLOCK;
insert overwrite table seqfile_table select * from textfile_table;  

5、 ORC File

ORCFile 它的全名是Optimized Row Columnar (ORC) file

其实就是对RCFile做了一些优化

据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据

它的设计目标是来克服Hive其他格式的缺陷

运用ORC File可以提高Hive的读、写以及处理数据的性能。

5.1 ORCFile格式的组成

每个Orc文件由1个或多个stripe组成,每个stripe一般为HDFS的块大小,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。每个Stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer:

1)Index Data:一个轻量级的index,默认是每隔1W行做一个索引。这里做的索引应该只是记录某行的各字段在Row Data中的offset。

2)Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储

3)Stripe Footer:存的是各个Stream的类型,长度等信息。

每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。

  1. ORC File包含一组组的行数据,称为stripes
  2. 除此之外,ORC File的file footer还包含了该ORC File文件中stripes的信息,每个stripe中有多少行,以及每列的数据类型。当然,它里面还包含了列级别的一些聚合的结果,比如:count, min, max, and sum
  3. 在ORC File文件的最后,有一个被称为postscript的区,它主要是用来存储压缩参数及压缩页脚的大小。
  4. 在默认情况下,一个stripe的大小为250MB。大尺寸的stripes使得从HDFS读数据更高效。

5.2 ORCFile格式的优点

  1. 每个task只输出单个文件,这样可以减少NameNode的负载;
  2. 支持各种复杂的数据类型,比如: datetime, decimal, 以及一些复杂类型(struct, list, map, and union);
  3. 在文件中存储了一些轻量级的索引数据
  4. 基于数据类型的块模式压缩:a、integer类型的列用行程长度编码(run-length encoding);b、String类型的列用字典编码(dictionary encoding);
  5. 用多个互相独立的RecordReaders并行读相同的文件;
  6. 无需扫描markers就可以分割文件;
  7. 绑定读写所需要的内存;
  8. metadata的存储是用 Protocol Buffers的,所以它支持添加和删除一些列

5.2 ORCFile设计思想

5.3 ORC存储方式的压缩

官网:LanguageManual ORC - Apache Hive - Apache Software Foundation

ORC存储方式的压缩:

6、Parquet

6.1 概述

Apache Parquet是Hadoop生态圈中一种新型列式存储格式,它可以兼容Hadoop生态圈中大多数计算框架(Mapreduce、Spark等),被多种查询引擎支持(Hive、Impala、Drill等),并且它是语言和平台无关的。Parquet最初是由Twitter和Cloudera合作开发完成并开源,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。

Parquet最初的灵感来自Google于2010年发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据

因此Parquet格式文件是自解析的。

6.2 parquet的数据模型

Parquet支持嵌套的数据模型,类似于Protocol Buffers,每一个数据模型的schema包含多个字段,每一个字段有三个属性:重复次数、数据类型和字段名,重复次数可以是以下三种:

  1. required(只出现1次)
  2. repeated(出现0次或多次)
  3. optional(出现0次或1次)

每一个字段的数据类型可以分成两种:

  1. group(复杂类型)
  2. primitive(基本类型)

schema示例:

可以把这个Schema转换成树状结构,根节点可以理解为repeated类型

可以看出在Schema中所有的基本类型字段都是叶子节点,在这个Schema中一共存在6个叶子节点,如果把这样的Schema转换成扁平式的关系模型,就可以理解为该表包含六个列。Parquet中没有Map、Array这样的复杂数据结构,但是可以通过repeated和group组合来实现的。由于一条记录中某一列可能出现零次或者多次,需要标示出哪些列的值构成一条完整的记录。这是由Striping/Assembly算法实现的。

由于Parquet支持的数据模型比较松散,可能一条记录中存在比较深的嵌套关系,如果为每一条记录都维护一个类似的树状结可能会占用较大的存储空间,因此Dremel论文中提出了一种高效的对于嵌套数据格式的压缩算法:Striping/Assembly算法。它的原理是每一个记录中的每一个成员值有三部分组成:Value、Repetition level和Definition levelvalue记录了该成员的原始值,可以根据特定类型的压缩算法进行压缩,两个level值用于记录该值在整个记录中的位置。对于repeated类型的列,Repetition level值记录了当前值属于哪一条记录以及它处于该记录的什么位置;对于repeated和optional类型的列,可能一条记录中某一列是没有值的,假设我们不记录这样的值就会导致本该属于下一条记录的值被当做当前记录的一部分,从而造成数据的错误,因此对于这种情况需要一个占位符标示这种情况。

通过Striping/Assembly算法,parquet可以使用较少的存储空间表示复杂的嵌套格式,并且通常Repetition level和Definition level都是较小的整数值,可以通过RLE算法对其进行压缩,进一步降低存储空间。

6.3 parquet文件结构

Parquet文件在磁盘所有数据分成多个RowGroup 和 Footer。

  1. RowGroup: 真正存储数据区域,每一个RowGroup存储多个ColumnChunk的数据。
  2. ColumnChunk就代表当前RowGroup某一列的数据,因为可能这一列还在其他RowGroup有数据。ColumnChunk可能包含一个Page。
  3. Page是压缩和编码的单元,主要包括PageHeader,RepetitionLevel,DefinitionLevel和Values.
  4. PageHeader: 包含一些元数据,诸如编码和压缩类型,有多少数据,当前page第一个数据的偏移量,当前Page第一个索引的偏移量,压缩和解压的大小
  5. DefinitionLevel: 当前字段在路径中的深度
  6. RepetitionLevel: 当前字段是否可以重复
  7. Footer:主要当前文件的元数据和一些统计信息

通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式:

Definition Level

  1. 指明该列的路径上有多少个可选的字段被定义了。A.B.C 表示C列这个路径上有三个可选的字段被定义了。也可以理解为definition Level是该路径上有定义的repeated field 和optional field的个数,不包括required field,因为requiredfield是必须有定义的嵌套数据的特点是有的字段可以为空,比如optional或者repeated。
  2. 如果一个字段被定义,那么它的所有父节点都是被定义的。我们从root节点开始遍历,当某一个字段路径上的节点为空或者我们说已经没有子节点的节点的时候,我们就记录下当前的深度作为这个字段的DefinitionLevel. 当一个字段的DefinitionLevel = Max Definition Level,表示这个字段是有数据的。另外,required类型是字段定义的,所以它不需要DefinitionLevel
messageDemo {--- D = 0
  optional group field1 { ----D = 1
    required group fiel2 {----D = 1(required是不使用DefinitionLevel的)
      optional string field3;----D = 2
    }
  }
}

Repetition Level

RepetitionLevel是针对repeated字段的,对于optional和required,是没有啥关系的。意思就是指在哪一个深度上进行重复。
简单的说,就是通过数字让程序明白在路径中什么repeated字段重复了,以此来确定这个字段的位置
举个例子:
我们定一个Author的数据模型:

最后生成的数据:

分析:AuthorID:因为该字段是required,必须定义的,所以,它是没有DefinitionValue,所以都是0

Addresses

因为该字段是repeated,允许0个或多个值,所以DefinitionLevel  = 1;第一个Author的第一个Addresses由于之前没有重复,是一个新的record,所以RepetitionLevel = 0; 第二个 Addresses由于在之前已经出现过一次,所以它是重复的,重复深度是1,所以RepetitionLevel = 1;

到第二Author的时候,Address是一个新的record,所以没有重复,RepetitionLevel = 0,DefinitionLevel  = 1

Books.BookID

因为该字段是required,必须定义的,所以,他没有DefinitionValue,那么他的DefinitionValue和父父节点的DefinitionValue相同,DefinitionValue = 1. 因为Books是Repeated的,但是Books.BookId只出现一次,所以RepetitionLevel = 0。

到第二个Books.BookId的时候,由于之前已经有过Books,所以现在是重复的,只是Books重复,所以重复深度为1,那么此时RepetitionLevel = 1,DefinitionValue = 1. 到第三个Books.BookkId的时候,同样他也是重复的,重复深度也还是1,所以RepetitionLevel = 1,DefinitionValue = 1.

Books.Price

由于price是optional,所以在树种有两个DefinitionLevel=2,由于第一次出现Books.Price,所以RepetitionLevel = 0;

第二个Books.Price的时候,DefinitionLevel=2,但是Books已经是重复的,所以现在RepetitionLevel = 1;第三个没有Books.Price,所以DefinitionLevel = 1(和Books的DefinitionLevel一样),RepetitionLevel = 1;

Books.Descs.Type

由于是Required,所以DefinitionLevel没有,和父节点的DefinitionLevel是一样的,故DefinitionLevel  = 2;第一次出现Books.Descs.Type,所以RepetitionLevel = 0;第二次出现Books.Descs.Type,由于之前已经存在了Books.Descs,所以现在他重复了,Descs重复深度是2,所以DefinitionLevel  = 2, Repetition Level = 2; 下一个Books.Descs.Type由于没有Descs,所以DefinitionLevel  = 1,Repetition Level只是Books重复,所以深度为1,值为NULL;到下一个Books.Descs.Type,由于只是Books重复,所以重复深度为1,DefinitionLevel  = 2

Metadata

7、RCFile

create table if not exists rcfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as rcfile;

7.1 RCFile的设计思想

RCFile结合列存储和行存储的优缺点,Facebook于是提出了基于行列混合存储的RCFile,该存储结构遵循的是“先水平划分,再垂直划分”的设计理念。先将数据按行水平划分为组,这样一行的数据就可以保证存储在同一个集群节点;然后在对行进行垂直划分。 
RCFile是在Hadoop HDFS之上的存储结构,该结构强调: 

  1. RCFile存储的表是水平划分的,分为多个行组,每个行组再被垂直划分,以便每列单独存储; 
  2. RCFile在每个行组中利用一个列维度的数据压缩,并提供一种Lazy解压(decompression)技术来在查询执行时避免不必要的列解压; 
  3. RCFile支持弹性的行组大小,行组大小需要权衡数据压缩性能查询性能两方面。
  4. RCFile的每个行组中,元数据头部表格数据段(每个列被独立压缩)分别进行压缩,RCFile使用重量级的Gzip压缩算法,是为了获得较好的压缩比。另外在由于Lazy压缩策略,当处理一个行组时,RCFile只需要解压使用到的列,因此相对较高的Gzip解压开销可以减少。 
  5. RCFile具备相当于存储的数据加载速度和负载适应能力,在读数据时可以在扫描表格时避免不必要的列读取,它比其他结构拥有更好的性能,使用列维度的压缩能够有效提升存储空间利用率。

7.2 源码分析

通常而言,RCFile文件的整个写入过程大致可以分为三步:

  1. 构建RCFile.Writer实例——Writer(...)
  2. 通过RCFile.Writer实例写入数据——append
  3. 关闭RCFile.Writer实例——close

我们也按照这三步来分析相应的源码。

1. Writer

Writer在构建函数中大体做了以下三件事情:
1)初始化一些变量值;
a. RECORD_INTERVAL:表示多少“行”数据形成一个Row Split(Record)和columnsBufferSize配合使用;
b. columnNumber:表示当前RCFile文件存储着多少“列”的数据;
c. Metadata:Metadata实例仅仅保存一个属性“hive.io.rcfile.column.number”,值为columnNumber,该实例会被序列化到RCFile文件头部;
d. columnsBufferSize:缓存数目(行数)上限阀值,超过这个数值就会将缓存的数据(行)形成一个Row Split(Record);
2)构建一些数据结构;
a. columnValuePlainLength:保存着一个Row Split(Record)内部各列原始数据的大小;
b. columnBuffers:保存着一个Row Split(Record)内部各列原始数据;
c. key:保存着一个Row Split(Record)的元数据;
d. plainTotalColumnLength:保存着一个RCFile文件内各列原始数据的大小;
e. comprTotalColumnLength:保存着一个RCFile文件内各列原始数据被压缩后的大小;

3)初始化文件输出流,并写入文件头部信息;
a. 初始化RCFile文件输出流(FSDataOutputStream);useNewMagic默认值为true,本文也以此默认值进行讨论。
b. initializeFileHeader;1. 写出MAGIC;2.  写出当前RCFile版本号(不同版本的RCFile具有不同的格式);
c. writeFileHeader;1. 写出是否使用压缩,本文按使用压缩讨论;2. 写出压缩编/解码器(CompressionCodec)类名;3. 序列化Metadata实例;
d. finalizeFileHeader;

写出一个“同步标志位”,表示RCFile文件头部信息到此结束。

我们可以得出RCFile Header的结构如下:

 version3 bytes of magic header “RCF”, followed by 1 byte of actual version number
compression A boolean which specifies if compression is turned on for keys/values in this file
compression codecCompressionCodec class which is used for compression of keys and/or values
metadataMetadata for this file
syncA sync marker to denote end of the header

2. append

RCFile.Writer写入数据时要求以BytesRefArrayWritable实例的形式进行“追加”,亦即一个BytesRefArrayWritable实例表示一“行”数据。
“追加”“行”数据的过程如下:
1)从一“行”数据(即BytesRefArrayWritable实例val)中解析出各“列”数据缓存到对应的ColumnBuffer(即columnBuffers[i])中;如果这“行”数据包含的“列”小于columnNumber,则缺失的列会被填充为“空值”(即BytesRefWritable.ZeroBytesRefWritable);


我们可以看出,RCFile在“追加”数据的时候还是以“行”的方式进行,“行转列”是在内部进行转换的。转换之后的列数据(列数为columnNumber)被缓存到各自的“Buffer”中,也就是说每一列都有自己独立的缓存区(ColumnBuffer),这是为后来的“列式存储”作准备的。
ColumnBuffer
这里重点介绍一下这个ColumnBuffer,它的作用就是用来缓存“列数据”的,
内部包含两个实例变量,如它们的变量名称所言,它们实际也是用来缓存数据的,columnValBuffer用来缓存“列值”的数据valLenBuffer用来缓存“列值”各自的长度,这两个内部的缓存区都是NonSyncDataOutputBuffer实例。

从这三部分代码可以看出,NonSyncDataOutputBuffer内部的缓存区实际是使用内存中的一个字节数组(buf)构建的,而且继承自DataOutputStream,方便我们使用“流”的形式操作数据。而且valLenBuffer在缓存“列值”的长度的时候,为了有效的节约存储空间,使用了一个技巧,也就是说,如果需要保存的“列值”长度为“1,1,1,2”,需要存储四个整数,而且前面三个整数的值是一样的,那么我们将其变为“1,~2,2”,“~2”即表示我们需要将它前面的整数“1”重复两次。如果数据的重复度较高,这种方式会节省大量的存储空间。
RowSplit
2)一“行”数据转换为多“列”数据,并被缓存到各自对应的缓存区之后,需要进行两个判断:

  1. 缓存的“列”数据(这里指columnBuffers中的全部列数据)大小是否超过上限阀值columnsBufferSize?
  2. 缓存的“行”记录数目是否超过上限阀值RECORD_INTERVAL?

如果上述两者条件满足其一,我们认为已经缓存足够多的数据,可以将缓存区的这些数据形成一个Row Split或Record,进行“溢写”。
这两个上限阀值(columnsBufferSize、RECORD_INTERVAL)也提示我们在实际应用中需要根据实际情况对这两个值进行调整。
“溢写”是通过flushRecords进行的,可以说是整个RCFile写入过程中最为“复杂”的操作。

前面提到过,RCFile Record(Row Split)实际是由Key、Value组成的,现在这些“列”数据已经被缓存到columnBuffers中,那么Key的数据在哪里呢?
这个Key实际上就是这个Row Split(Record)的元数据,也可以理解为Row Split(Record)的索引,它是由KeyBuffer表示的,
columnNumber:列数;
numberRows:RCFile Record(Row Split)内部存储着多少“行”数据,同一个RCFile文件,不同的Record内保存的行数可能不同;
RCFile Record Value实际就是前面提到的columnBuffers中的那些列值(可能经过压缩处理),这些columnBuffers的元数据由以下三个变量表示:

  1. eachColumnValueLen:eachColumnValueLen[i]表示columnBuffers[i]中缓存的列数据(原始数据)的总大小;
  2. eachColumnUncompressedValueLen:eachColumnUncompressedValueLen[i]表示columnBuffers[i]中的缓存的列数据被压缩之后的总大小;如果没有经过压缩处理,该值与columnBuffers[i]相同;
  3. allCellValLenBuffer:allCellValLenBuffer[i]表示columnBuffers[i]中那些列数据各自的长度(注意前方提到的这些长度的保存技巧);

KeyBuffer被序列化之后,它的结构如下:

numberRowsNumber_of_rows_in_this_record(vint)
columnValueLenColumn_1_ondisk_compressed_length(vint)
columnUncompressedValueLenColumn_1_ondisk_uncompressed_length(vint)
Column_1_row_1_value_plain_length
Column_1_row_2_value_plain_length
...
columnValueLenColumn_2_ondisk_compressed_length(vint)
columnUncompressedValueLenColumn_2_ondisk_uncompressed_length(vint)
Column_2_row_1_value_plain_length
Column_2_row_2_value_plain_length
...

RCFile的索引机制


注意到上面的多个columnValueLen(columnUncompressedValueLen),它保存着Record Value内多个列(簇)各自的总长度,而每个columnValueLen(columnUncompressedValueLen)后面保存着该列(簇)内多个列值各自的长度。如果我们仅仅需要读取第n列的数据,我们可以根据columnValueLen(columnUncompressedValueLen)直接跳过Record Value前面(n - 1)列的数据。


KeyBuffer的数据是在“溢写”的过程中被构建的。

flushRecords的具体逻辑

key是KeyBuffer的实例,相当于在元数据中记录这个Row Split(Record)的“行数”;
这段代码在使用压缩的场景下才有意义,它构建了一个缓存区valueBuffer,并且使用“装饰器”模式构建了一个压缩输出流,用于后期将columnBuffers中的数据写入缓存区valueBuffer,valueBuffer中的数据是压缩过的
接下来就是逐个处理columnBuffers中的数据,简要来说,对于某个columnBuffers[i]而言需要做两件事情:
1)如果使用压缩,需要将columnBuffers[i]的数据通过压缩输出流deflateOut写入valueBuffer中;
2)维护相关的几个变量值;


 这段代码看似较长,对于某个columnBuffers[i]而言,实际做的事情可以概括为四步:
1)如果使用压缩,将columnBuffers[i]中的全部数据写入deflateOut(实际是valueBuffer);
2)记录columnBuffers[i]经过压缩之后的长度colLen;如果没有使用使用压缩,则该值与原始数据长度相同;
3)记录columnBuffers[i]相关元数据:columnBuffers[i]压缩/未压缩数据的长度、columnBuffers[i]中各个列值的长度;
4)维护plainTotalColumnLength、comprTotalColumnLength;
代码至此,一个Record(Row Split)的所有元数据已构建完毕;如果启用压缩,columnBuffers中的数据已全部被压缩写入valueBuffer,接下来就是Record Key、Value的“持久化”。

RCFile的Sync机制

比如我们有一个“大”的文本文件,需要使用MapReduce进行分析。Hadoop MapReduce在提交Job之前会将这个大的文本文件根据“切片”大小(假设为128M)进行“切片”,每一个MapTask处理这个文件的一个“切片”(这里不考虑处理多个切片的情况),也就是这个文件的一部分数据。文本文件是按行进行存储的,那么MapTask从某个“切片”的起始处读取文件数据时,如何定位一行记录的起始位置呢?


毕竟“切片”是按照字节大小直接切分的,很有可能正好将某行记录“切断”。这时就需要有这样的一个“sync”,相当于一个标志位的作用,让我们可以识别一行记录的起始位置,对于文本文件而言,这个“sync”就是换行符。所以,MapTask从某个“切片”的起始处读取数据时,首先会“过滤”数据,直到遇到一个换行符,然后才开始读取数据;如果读取某行数据结束之后,发现“文件游标”超过该“切片”的范围,则读取结束。


RCFile同样也需要这样的一个“sync”,对于文本文件而言,是每行文本一个“sync”;RCFile是以Record为单位进行存储的,但是并没有每个Record使用一个“sync”,而是两个“sync”之间有一个间隔限制SYNC_INTERVAL,
SYNC_INTERVAL = 100 * (4 + 16)
 每次开始输出下一个Record的数据之前,都会计算当前文件的输出位置相对于上个“sync”的偏移量,如果超过SYNC_INTERVAL就输出一个“sync”。
ii. write total record length、key portion length
iii. write keyLength、keyBuffer
注意这里的keyLength与ii中的keyLength不同:ii中的keyLength相当于记录的是keyBuffer原始数据的长度;而iii中的keyLength相当于记录的是keyBuffer原始数据被压缩之后的长度,如果没有压缩,该值与ii中的keyLength相同。

代码至此,我们就完成了一个Row Split(Record)的输出。
最后就是清空相关记录,为下一个Row Split(Record)的缓存输出作准备,

RCFileclose过程

RCFile文件的“关闭”操作大致可分为两步:
1)如果缓存区中仍有数据,调用flushRecords将数据“溢写”出去;
2)关闭文件输出流。

数据读取和Lazy解压

在MapReduce框架中,mapper将顺序处理HDFS块中的每个行组。当处理一个行组时,RCFile无需全部读取行组的全部内容到内存。相反,它仅仅读元数据头部和给定查询需要的列。因此,它可以跳过不必要的列以获得列存储的I/O优势。(例如,表tbl(c1, c2, c3, c4)有4个列,做一次查询“SELECT c1 FROM tbl WHERE c4 = 1”,对每个行组,RCFile仅仅读取c1和c4列的内容。).在元数据头部和需要的列数据加载到内存中后,它们需要解压。元数据头部总会解压并在内存中维护直到RCFile处理下一个行组。然而,RCFile不会解压所有加载的列,相反,它使用一种Lazy解压技术。

Lazy解压意味着列将不会在内存解压,直到RCFile决定列中数据真正对查询执行有用。由于查询使用各种WHERE条件,Lazy解压非常有用。如果一个WHERE条件不能被行组中的所有记录满足,那么RCFile将不会解压WHERE条件中不满足的列。例如,在上述查询中,所有行组中的列c4都解压了。然而,对于一个行组,如果列c4中没有值为1的域,那么就无需解压列c1。

行组大小

I/O性能是RCFile关注的重点,因此RCFile需要行组够大并且大小可变。行组大小和下面几个因素相关。

  1. 行组大的话,数据压缩效率会比行组小时更有效。根据对Facebook日常应用的观察,当行组大小达到一个阈值后,增加行组大小并不能进一步增加Gzip算法下的压缩比。
  2. 行组变大能够提升数据压缩效率并减少存储。因此,如果对缩减存储空间方面有强烈需求,则不建议选择使用小行组。需要注意的是,当行组的大小超过4MB,数据的压缩比将趋于一致。
  3. 尽管行组变大有助于减少表格的存储规模,但是可能会损害数据的读性能,因为这样减少了Lazy解压带来的性能提升。而且行组变大会占用更多的内存,这会影响并发执行的其他MapReduce作业。考虑到存储空间和查询效率两个方面,Facebook选择4MB作为默认的行组大小,当然也允许用户自行选择参数进行配置。

8、存储方式和压缩

在实际的项目开发当中,

hive表的数据存储格式一般选择:orc或parquet。

压缩方式一般选择snappy,lzo。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐