Doris学习笔记之数据的导入导出
本文介绍了Doris的数据导入和导出功能,对其中字段、参数进行了详细说明,并配合了大量案例
数据导入
导入(Load)功能就是将用户的原始数据导入到Doris中。导入成功后,用户即可通过Mysql客户端查询数据。为适配不同的数据导入需求,Doris系统提供了 6 种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步、同步)。所有导入方式都支持csv数据格式。其中Broker load还支持parquet和orc数据格式。
Broker load:通过Broker进程访问并读取外部数据源(如 HDFS)导入到Doris。用户通过 Mysql协议提交导入作业后,异步执行。通过SHOW LOAD命令查看导入结果。
Stream load:用户通过HTTP协议提交请求并携带原始数据创建导入。主要用于快速将本地文件或数据流中的数据导入到Doris。导入命令同步返回导入结果。
Insert:类似MySQL中的Insert语句,Doris提供INSERT INTO tbl SELECT ...;
的方式从Doris 的表中读取数据并导入到另一张表。或者通过INSERT INTO tbl VALUES(...);
插入单条数据。
Multi load:用户通过HTTP协议提交多个导入作业。Multi Load可以保证多个导入作业的原子生效。
Routine load:用户通过MySQL协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如Kafka)中读取数据并导入到Doris中。
通过 S3 协议直接导入:用户通过S3协议直接导入数据,用法和Broker Load类似。Broker load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。用户需要通过 MySQL协议创建Broker load导入,并通过查看导入命令检查导入结果。
Broker导入
适用场景
源数据在Broker可以访问的存储系统中,如HDFS,且数据量在几十到百GB级别。
基本原理
用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。
BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。
基本语法
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
* broker_properties:
(key1=value1, ...)
Label
导入任务的标识。每个导入任务,都有一个在单database内部唯一的Label。Label是用户在导入命令中自定义的名称。通过这个Label,用户可以查看对应导入任务的执行情况。Label的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once语义。当Label对应的导入作业状态为 CANCELLED时,可以再次使用该Label提交导入作业。
数据描述类参数
数据描述类参数主要指的是Broker load创建导入语句中的属于data_desc部分的参数。每组data_desc主要表述了本次导入涉及到的数据源地址,ETL函数,目标表及分区等信息。下面主要对数据描述类的部分参数详细解释:
- 多表导入
Broker load支持一次导入任务涉及多张表,每个Broker load导入任务可在多个data_desc声明多张表来实现多表导入。每个单独的data_desc还可以指定属于该表的数据源地址。Broker load保证了单次导入的多张表之间原子性成功或失败。 - negative
data_desc中还可以设置数据取反导入。这个功能主要用于,当数据表中聚合列的类型都为SUM类型时。如果希望撤销某一批导入的数据。则可以通过negative参数导入同一批数据。Doris会自动为这一批数据在聚合列上数据取反,以达到消除同一批数据的功能。 - partition
在data_desc中可以指定待导入表的partition信息,如果待导入数据不属于指定的partition则不会被导入。同时,不在指定Partition的数据会被认为是错误数据。 - set column mapping
在data_desc中的SET语句负责设置列函数变换,这里的列函数变换支持所有查询的等值表达式变换。如果原始数据的列和表中的列不一一对应,就需要用到这个属性。 - preceding filter predicate
用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。 - where predicate
在data_desc中的WHERE语句负责过滤已经完成transform的数据,被filter的数据不会进入容忍率的统计中。如果多个data_desc中声明了同一张表的多个条件的话,则会merge同一张表的多个条件,merge策略是 AND 。
导入作业参数
导入作业参数主要指的是Broker load创建导入语句中的属于opt_properties部分的参数。导入作业参数是作用于整个导入作业的。下面主要对导入作业参数的部分参数详细解释:
- timeout:
导入作业的超时时间(以秒为单位),用户可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。Broker load的默认导入超时时间为4小时。通常情况下,用户不需要手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。 - 推荐超时时间:
总文件大小(MB) / 用户Doris集群最慢导入速度(MB/s) > timeout >((总文件大小(MB) * 待导入的表及相关Roll up表的个数)/ (10 * 导入并发数))
导入并发数见文档最后的导入系统配置说明,公式中的10为目前的导入限速10MB/s。例如一个1G的待导入数据,待导入表包含3个Rollup表,当前的导入并发数为3。则timeout的最小值为 (1 * 1024 * 3 ) / (10 * 3) = 102秒
由于每个Doris集群的机器环境不同且集群并发的查询任务也不同,所以用户Doris集群的最慢导入速度需要用户自己根据历史的导入任务速度进行推测。 - max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
如果用户希望忽略错误的行,可以通过设置这个参数大于0,来保证导入可以成功。
计算公式为:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL) )
dpp.abnorm.ALL表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。
dpp.norm.ALL指的是导入过程中正确数据的条数。可以通过SHOW LOAD命令查询导入任务的正确数据量。
原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL - exec_mem_limit:
导入内存限制。默认是2GB。单位为字节。 - strict_mode:
Broker load导入可以开启strict mode模式。开启方式为properties (“strict_mode” =“true”) 。默认的strict mode为关闭。
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
① 对于列类型转换来说,如果strict mode为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
② 对于导入的某列由函数变换生成时,strict mode对其不产生影响。
③ 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode对其也不产生影响。例如:如果类型是decimal(1,0), 原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据strict对其不产生影响。 - merge_type:
数据的合并类型,一共支持三种类型:APPEND、DELETE、MERGE。其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE表示删除与这批数据key相同的所有行,MERGE语义需要与delete 条件联合使用,表示满足delete条件的数据按照DELETE语义处理,其余的按照APPEND语义处理。
导入示例
首先在Doris中创建表:
create table student_result
(
id int ,
name varchar(50),
age int ,
score decimal(10,4)
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES
(
"replication_num" = "1",
"storage_medium" = "SSD"
);
启动HDFS,将文件上传:
[root@scentos szc]# hadoop fs -put students.csv /
csv文件导入:
LOAD LABEL test.student_result
(
DATA INFILE("hdfs://scentos:8020/students.csv")
INTO TABLE `student_result`
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(id, name, age, score)
)
WITH BROKER broker_name
(
"dfs.nameservices" = "my_cluster",
"dfs.ha.namenodes.my_cluster" = "scentos",
"dfs.namenode.rpc-address.my_cluster.scentos" = "scentos:8020",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
PROPERTIES
(
"timeout" = "3600"
);
查看导入
Broker load导入方式由于是异步的,所以用户必须将创建导入的Label记录,并且在查看导入命令中使用Label来查看导入结果。查看导入命令在所有导入方式中是通用的,具体语法可执行HELP SHOW LOAD
查看。
mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 11054
Label: student_result
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=4
TaskInfo: cluster:N/A; timeout(s):3600; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2022-05-15 15:16:35
EtlStartTime: 2022-05-15 15:16:38
EtlFinishTime: 2022-05-15 15:16:38
LoadStartTime: 2022-05-15 15:16:38
LoadFinishTime: 2022-05-15 15:16:39
URL: NULL
JobDetails: {"Unfinished backends":{"31dbce4885cc4bd6-8a34d9df79597c56":[]},"ScannedRows":4,"TaskNumber":1,"All backends":{"31dbce4885cc4bd6-8a34d9df79597c56":[10002]},"FileNumber":1,"FileSize":52}
1 row in set (0.00 sec)
下面主要介绍了查看导入命令返回结果集中参数意义:
- JobId
导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。 - Label
导入任务的标识。 - State
导入任务当前所处的阶段。在Broker load导入过程中主要会出现PENDING和LOADING这两个导入中的状态。如果Broker load处于PENDING状态,则说明当前导入任务正在等待被执行;LOADING状态则表示正在执行中。导入任务的最终阶段有两个:CANCELLED 和 FINISHED,当 Load job处于这两个阶段时,导入完成。其中CANCELLED为导入失败,FINISHED为导入成功。 - Progress
导入任务的进度描述。分为两种进度:ETL和LOAD,对应了导入流程的两个阶段ETL和LOADING,进度范围为:0~100%。
LOAD进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%,如果所有导入表均完成导入,此时LOAD的进度为99% 导入进入到最后生效阶段,整个导入完成后,LOAD的进度才会改为100%。导入进度并不是线性的。所以如果一段时间内进度没有变化,并不代表导入没有在执行。 - Type
导入任务的类型。Broker load的type取值只有BROKER。 - EtlInfo
主要显示了导入的数据量指标unselected.rows、dpp.norm.ALL和dpp.abnorm.ALL。用户可以根据第一个数值判断where条件过滤了多少行,后两个指标验证当前导入任务的错误率是否超过 max_filter_ratio。三个指标之和就是原始数据量的总行数。 - TaskInfo
主要显示了当前导入任务参数,也就是创建Broker load导入任务时用户指定的导入任务参数,包括:cluster,timeout和max_filter_ratio。 - ErrorMsg
在导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type和msg,如果导入任务成功则显示N/A。
type 的取值意义:
值 | 含义 |
---|---|
USER_CANCEL | 用户取消的任务 |
ETL_RUN_FAIL | 在ETL阶段失败的导入任务 |
ETL_QUALITY_UNSATISFIED | 数据质量不合格,也就是错误数据率超过了max_filter_ratio |
LOAD_RUN_FAIL | 在LOADING阶段失败的导入任务 |
TIMEOUT | 导入任务没在超时时间内完成 |
UNKNOWN | 未知的导入错误 |
- CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime
这几个值分别代表导入创建的时间、ETL阶段开始的时间、ETL阶段完成的时间、Loading阶段开始的时间和整个导入任务完成的时间。
Broker load导入由于没有ETL阶段,所以其EtlStartTime、EtlFinishTime和LoadStartTime被设置为同一个值。
导入任务长时间停留在CreateTime,而LoadStartTime为N/A 则说明目前导入任务堆积严重。用户可减少导入提交的频率。
LoadFinishTime - CreateTime = 整个导入任务所消耗时间
LoadFinishTime - LoadStartTime = 整个Broker load导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间 - URL
导入任务的错误数据样例,访问URL地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL字段则为N/A。 - JobDetails
显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点Id,未完成的BE节点Id。
{"Unfinished backends":{"31dbce4885cc4bd6-8a34d9df79597c56":[]},"ScannedRows":4,"TaskNumber":1,"All backends":{"31dbce4885cc4bd6-8a34d9df79597c56":[10002]},"FileNumber":1,"FileSize":52}
其中ScannedRows表示已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的为准。
取消导入
当 Broker load作业状态不为CANCELLED或FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的Label ,取消导入命令语法可执行HELP CANCEL LOAD
查看:
CANCEL LOAD [FROM db_name] WHERE LABEL=”load_label”;
Stream导入
Stream load是一个同步的导入方式,用户通过发送HTTP协议发送请求将本地文件或数据流导入到Doris中。Stream load同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
适用场景
Stream load主要适用于导入本地文件,或通过程序导入数据流中的数据。目前Stream Load支持两个数据格式:CSV(文本)和JSON。
基本原理
下图展示了Stream load的主要流程,省略了一些导入细节:
Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。用户通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由 Coordinator BE返回给用户。
基本语法
Stream load通过HTTP协议提交和传输数据。这里通过curl命令展示如何提交导入。用户也可以通过其他HTTP client进行操作。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
Header中支持属性见下面的‘导入任务参数’说明,格式为: -H “key1:value1”。创建导入的详细语法帮助可通过HELP STREAM LOAD
查看, 下面主要介绍创建Streamload的部分参数意义。
- 签名参数
user/passwd:Stream load由于创建导入的协议使用的是HTTP协议,通过Basic access authentication进行签名。Doris系统会根据签名验证用户身份和导入权限。 - 导入任务参数
Stream load由于使用的是HTTP协议,所以所有导入任务有关的参数均设置在Header中。下面主要介绍了Stream load导入任务参数的部分参数意义。
参数 | 含义 |
---|---|
Label | 导入任务的标识 |
column_separator | 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01" 。可以使用多个字符的组合作为列分隔符 |
max_filter_ratio | 导入任务的最大容忍率 |
where | 导入任务指定的过滤条件。Stream load支持对原始数据指定where语句进行过滤。被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected |
partition | 待导入表的Partition信息,如果待导入数据不属于指定的Partition则不会被导入,这些数据将计入 dpp.abnorm.ALL |
columns | 待导入数据的函数变换配置,目前Stream load支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致 |
exec_mem_limit | 导入内存限制。默认为2GB,单位为字节 |
two_phase_commit:Stream | Stream load导入可以开启两阶段事务提交模式。开启方式为在HEADER中声明two_phase_commit=true 。默认的两阶段批量事务提交为关闭 |
对于columns参数,此处对列顺序变换和表达式变换进行举例:
列顺序变换例子:
原始数据有三列(src_c1,src_c2,src_c3), 目前 doris 表也有三列(dst_c1,dst_c2,dst_c3),如果原始表的 src_c1 列对应目标表 dst_c1 列,原始表的 src_c2 列对应目标表 dst_c2 列,原始表的 src_c3 列对应目标表 dst_c3 列,则写法为:columns: dst_c1, dst_c2, dst_c3;
原始表的 src_c1 列对应目标表 dst_c2 列,原始表的 src_c2 列对应目标表 dst_c3 列,原始表的 src_c3 列对应目标表 dst_c1 列,则写法为:columns: dst_c2, dst_c3, dst_c1
。
表达式变换例子:
原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法为:columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
,其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
对于two_phase_commit参数中的两阶段批量事务提交模式,其含义是:Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
用户可以调用如下接口对stream load事务触发commit操作:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
或
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc
用户可以调用如下接口对stream load事务触发 abort 操作:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
或
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc
导入示例
[root@scentos szc]# curl --location-trusted -u root -H "label:123" -H"column_separator:," -T students.csv -X PUT http://scentos:8030/api/test/student_result/_stream_load
Enter host password for user 'root':
{
"TxnId": 1004,
"Label": "123",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 4,
"NumberLoadedRows": 4,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 52,
"LoadTimeMs": 78,
"BeginTxnTimeMs": 8,
"StreamLoadPutTimeMs": 15,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 21,
"CommitAndPublishTimeMs": 32
}
由于Stream load是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
注意:由于Stream load是同步的导入方式,所以并不会在Doris系统中记录导入信息,用户无法异步的通过查看导入命令看到Stream load。使用时需监听创建导入请求的返回值获取导入结果。
取消导入
用户无法手动取消Stream load,Stream load在超时或者导入错误后会被系统自动取消。Stream Load是一个同步的导入方式,用户通过发送HTTP协议将本地文件或数据流导入到Doris中,Stream load同步执行导入并返回结果。用户可以直接通过返回判断导入是否成功。
Routine导入
例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。
适用场景
当前仅支持从Kafka系统进行例行导入,使用限制:
- 支持无认证的Kafka访问,以及通过SSL方式认证的Kafka集群。
- 支持的消息格式为csv、json文本格式。对于csv,每一个message为一行,且行尾不包含换行符。
- 默认支持Kafka 0.10.0.0(含)以上版本。如果要使用Kafka 0.10.0.0以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改be的配置,将
kafka_broker_version_fallback
的值设置为要兼容的旧版本,或者在创建routine load的时候直接设置property.broker.version.fallback
的值为要兼容的旧版本,使用旧版本的代价是routine load的部分新特性可能无法使用,如根据时间设置kafka分区的offset。
基本原理
如上图,Client向FE提交一个例行导入作业:
- FE通过JobScheduler将一个导入作业拆分成若干个Task。每个Task负责导入指定的一部分数据。Task被TaskScheduler分配到指定的BE上执行;
- 在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。导入完成后,向FE汇报;
- FE中的JobScheduler根据汇报结果,继续生成后续新的Task,或者对失败的Task进行重试;
- 整个例行导入作业通过不断的产生新的Task,来完成数据不间断的导入。
基本语法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
执行HELP ROUTINE LOAD
可以查看语法帮助,下面是参数说明:
- [db.]job_name
导入作业的名称,在同一个database内,相同名称只能有一个job在运行。 - tbl_name
指定需要导入的表的名称。 - merge_type
数据的合并类型,一共支持三种类型:APPEND、DELETE、MERGE。其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE表示删除与这批数据key相同的所有行,MERGE语义需要与delete on
条件联合使用,表示满足delete条件的数据按照DELETE 语义处理其余的按照APPEND语义处理 , 语法为:
[WITH MERGE|APPEND|DELETE]
- load_properties
用于描述导入数据。语法:
[column_separator], [columns_mapping], [where_predicates], [delete_on_predicates], [source_sequence], [partitions], [preceding_predicates]
(1)column_separator:指定列分隔符,如:COLUMNS TERMINATED BY ","
。这个只在文本数据导入的时候需要指定,JSON格式的数据导入不需要指定这个参数,默认为:\t
(2)columns_mapping:指定源数据中列的映射关系,以及定义衍生列的生成方式。
映射列:按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列k1, k2, v1。源数据有4列,其中第1、2、4列分别对应k2, k1, v1。则书写如下:
COLUMNS (k2, k1, xxx, v1)
其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
衍生列:以col_name = expr
的形式表示的列,我们称为衍生列。即支持通过expr计算得出目的表中对应列的值。 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是Doris总是先解析映射列,再解析衍生列。接上一个示例,假设目的表还有第4列v2,v2由k1和k2的和产生。则可以书写如下:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
再举例,假设用户需要导入只包含k1一列的表,列类型为int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以100。这个功能可以通过case when函数实现,正确写法应如下:
COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)
(3)where_predicates:用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入k1大于100并且k2等于1000的列,则书写如下:
WHERE k1 > 100 and k2 = 1000
(4)partitions:指定导入目的表的哪些partition中。如果不指定,则会自动导入到对应的 partition中。示例:
PARTITION(p1, p2, p3)
(5)delete_on_predicates:表示删除条件,仅在merge type为MERGE时有意义,语法与where 相同
(6)source_sequence:只适用于UNIQUE_KEYS,在相同key列下,保证value列按照 source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
(7)preceding_predicates:PRECEDING FILTER predicate用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
5. job_properties
用于指定例行导入作业的通用参数。 语法:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
目前支持以下参数:
(1)desired_concurrent_number:期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。这个并发度并不是实际的并发度,实际的并发度会通过集群的节点数、负载情况,以及数据源的情况综合考虑。一个作业,最多有多少task 同时在执行。对于Kafka导入而言,当前的实际并发度计算方式为:Min(partition num, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)。
其中Config.max_routine_load_task_concurrrent_num是系统默认的最大并发数限制。这是一个FE配置,可以通过改配置调整,默认为5,partition num指订阅的Kafka topic的partition数量,alive_backend_num是当前正常的 BE 节点数。
(2)max_batch_interval/max_batch_rows/max_batch_size:这三个参数分别表示:每个子任务最大执行时间,单位是秒,范围为5到60,默认为10;每个子任务最多读取的行数,必须大于等于 200000,默认是200000;每个子任务最多读取的字节数。单位是字节,范围是100MB到1GB。默认是100MB。这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。 例如:
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
(3)max_error_number:采样窗口内,允许的最大错误行数。必须大于等于0。默认是0,即不允许有错误行。采样窗口为max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。被where条件过滤掉的行不算错误行
(4)strict_mode:是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为"strict_mode" = "true"
(5)timezone:指定导入作业所使用的时区。默认为使用Session的timezone参数。该参数会影响所有导入涉及的和时区有关的函数结果
(6)format:指定导入数据格式,默认是csv,支持json格式
(7)jsonpaths:导入json方式,分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入,具体可参考示例
(8)strip_outer_array:布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
(9)json_root:json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""
(10)send_batch_parallelism:整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的max_send_batch_parallelism_per_job
, 那么作为协调点的BE将使用max_send_batch_parallelism_per_job
的值
6)data_source_properties:数据源的类型,当前支持:Kafka
(
"key1" = "val1",
"key2" = "val2"
)
Kafka导入示例
首先在doris中创建表:
create table student_kafka
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES
(
"replication_num" = "1",
"storage_medium" = "SSD"
);
然后启动kafka并准备数据:
[root@scentos szc]# kafka-topics.sh --create --zookeeper scentos:2181 --replication-factor 1 --partitions 1 --topic test_doris1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_doris1".
[root@scentos szc]# kafka-console-producer.sh --broker-list scentos:9092 --topic test_doris1
>
创建导入任务:
CREATE ROUTINE LOAD test.kafka_test ON student_kafka
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES
(
"desired_concurrent_number"="3",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list"= "scentos:9092",
"kafka_topic" = "test_doris1",
"property.group.id"="test_doris_group",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);
查看表数据,刚开始是空的:
mysql> select * from student_kafka;
Empty set (0.02 sec)
在kafka生产者中发数据:
[root@scentos szc]# kafka-console-producer.sh --broker-list scentos:9092 --topic test_doris1
>1,a,13
>2,b,14
>3,c,13
等一会儿数据就同步过来了:
mysql> select * from student_kafka;
+------+------+------+
| id | name | age |
+------+------+------+
| 3 | c | 13 |
| 2 | b | 14 |
| 1 | a | 13 |
+------+------+------+
3 rows in set (0.01 sec)
查看导入作业状态
查看作业状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD;
命令查看。
查看任务运行状态的具体命令和示例可以通过HELP SHOW ROUTINE LOAD TASK;
命令查看。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看
修改作业属性
用户可以修改已经创建的作业。具体说明可以通过HELP ALTER ROUTINE LOAD;
命令查看,或参阅 ALTER ROUTINE LOAD。
作业控制
用户可以通过STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。可以通过HELP STOP ROUTINE LOAD; HELP PAUSE ROUTINE LOAD;
以及HELP RESUMEROUTINE LOAD;
三个命令查看帮助和示例。
其他说明
例行导入作业和 ALTER TABLE 操作的关系
例行导入不会阻塞SCHEMA CHANGE和ROLLUP操作。但是注意如果SCHEMA CHANGE完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加Nullable列或带Default值的列来减少这类问题。
删除表的Partition可能会导致导入数据无法找到对应的Partition,作业进入暂停。
例行导入作业和其他导入作业的关系(LOAD, DELETE, INSERT)
例行导入和其他LOAD作业以及INSERT操作没有冲突。当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。所以在执行DELETE操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行DELETE。
例行导入作业和 DROP DATABASE/TABLE 操作的关系
当例行导入对应的database或table被删除后,作业会自动CANCEL。
kafka 类型的例行导入作业和 kafka topic 的关系
当用户在创建例行导入声明的kafka_topic在kafka集群中不存在时:
(1)如果用户kafka集群的broker设置了auto.create.topics.enable = true
,则kafka_topic会先被自动创建,自动创建的partition个数是由用户方的kafka集群中的broker配置num.partitions
决定的。例行作业会正常的不断读取该topic的数据。
(2)如果用户kafka 集群的broker设置了auto.create.topics.enable = false
, 则topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为PAUSED。所以,如果用户希望当kafka topic不存在的时候,被例行作业自动创建的话,只需要将用户方的kafka集群中的broker设置 auto.create.topics.enable = true
即可。
在网络隔离的环境中可能出现的问题
在有些环境中存在网段和域名解析的隔离措施,所以需要注意:
(1)创建Routine load任务中指定的Broker list必须能够被Doris服务访问
(2)Kafka 中如果配置了advertised.listeners, advertised.listeners中的地址必须能够被Doris服务访问
关于指定消费的 Partition 和 Offset
Doris支持指定Partition和Offset开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。
有三个相关参数:
参数名 | 含义 |
---|---|
kafka_partitions | 指定待消费的partition列表,如:“0, 1, 2, 3” |
kafka_offsets | 指定每个分区的起始offset,必须和kafka_partitions列表个数对应。如:“1000, 1000, 2000, 2000” |
property.kafka_default_offset | 指定分区默认的起始offset |
在创建导入作业时,这三个参数可以有以下组合:
STOP 和 PAUSE 的区别
FE会自动定期清理STOP状态的ROUTINE LOAD,而PAUSE状态的则可以再次被恢复启用。
Binlog导入
Binlog Load提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。
适用场景
- INSERT/UPDATE/DELETE支持;
- 过滤Query;
- 暂不兼容DDL语句。
基本原理
在第一期的设计中,Binlog Load需要依赖canal作为中间媒介,让canal伪造成一个从节点去获取Mysql主节点上的Binlog并解析,再由Doris去获取Canal上解析好的数据,主要涉及Mysql端、Canal端以及Doris端,总体数据流向如下:
如上图,用户向FE提交一个数据同步作业:
(1)FE会为每个数据同步作业启动一个canal client,来向canal server端订阅并获取数据;
(2)client中的receiver将负责通过Get 命令接收数据,每获取到一个数据batch,都会由consumer根据对应表分发到不同的channel,每个channel都会为此数据batch产生一个发送数据的子任务Task;
(3)在FE上,一个Task是channel向BE发送数据的子任务,里面包含分发到当前channel的同一个batch的数据;
(4)channel控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从consumer获取到多个batch的数据,因此会产生多个向BE发送数据的子任务Task,在提交事务成功前,这些Task不会实际生效;
(5)满足一定条件时(比如超过一定时间、达到提交最大数据大小),consumer将会阻塞并通知各个channel提交事务;
(6)当且仅当所有channel都提交成功,才会通过Ack命令通知canal并继续获取并消费数据;
(7)如果有任意channel提交失败,将会重新从上一次消费成功的位置获取数据并再次提交(已提交成功的channel不会再次提交以保证幂等性);
(8)整个数据同步作业中,FE通过以上流程不断地从canal获取数据并提交到BE,来完成数据同步。
配置MySQL端
在MySQLCluster模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在Cluster的多个节点间同步、备份都要通过Binlog日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
注意:目前只有Mysql 5.7及以上的版本才能支持Binlog Load功能。
1)打开mysql的二进制binlog日志功能,编辑my.cnf配置文件:
[mysqld]
server-id=1
log-bin = mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
binlog-do-db=test #指定具体要同步的数据库,也可以不设置
2)开启GTID模式 [可选]
一个全局事务Id(global transaction identifier)标识出了一个曾在主节点上提交过的事务,在全局都是唯一有效的。开启了Binlog后,GTID会被写入到Binlog文件中,与事务一一对应,编辑my.cnf 配置文件:
gtid-mode=on # 开启gtid模式
enforce-gtid-consistency=1 # 强制gtid和事务的一致性
在GTID模式下,主服务器可以不需要Binlog的文件名和偏移量,就能很方便的追踪事务、恢复数据、复制副本。同时,由于GTID的全局有效性,从节点将不再需要通过保存文件名和偏移量来定位主节点上的Binlog位置,而通过数据本身就可以定位了。在进行数据同步中,从节点会跳过执行任意被识别为已执行的GTID事务。GTID的表现形式为一对坐标, source_id标识出主节点,transaction_id表示此事务在主节点上执行的顺序(最大2^63-1)。
3)重启 MySQL 使配置生效
sudo systemctl restart mysqld
4)创建用户并授权
mysql> set global validate_password_length=4;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
Query OK, 0 rows affected, 1 warning (0.01 sec)
5)准备测试表
CREATE TABLE `test`.`tbl1` (
`a` int(11) NOT NULL COMMENT "",
`b` int(11) NOT NULL COMMENT ""
);
insert into test.tbl1 values(1,1),(2,2),(3,3);
配置Canal端
参见数据库CDC中间件学习之Canal中Canal安装配置一章,此时只需修改conf/example/instance.properties中的MySQL URL、用户名和密码即可。
配置目标表
在doris中创建与MySQL中对应的目标表:
CREATE TABLE `binlog_test` (
`a` int(11) NOT NULL COMMENT "",
`b` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`a`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`) BUCKETS 8
PROPERTIES
(
"replication_num" = "1",
"storage_medium" = "SSD"
);
Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。
然后,开启SYNC功能。在fe.conf中将enable_create_sync_job
设为true,不想修改配置文件重启,可以执行如下命令:
mysql> ADMIN SET FRONTEND CONFIG ("enable_create_sync_job" = "true");
Query OK, 0 rows affected (0.01 sec)
基本语法
创建数据同步作业的的详细语法在可以连接到Doris 后,执行 HELP CREATE SYNC JOB;查看语法帮助。
CREATE SYNC [db.]job_name
(
channel_desc,
channel_desc
...
)
binlog_desc
job_name:数据同步作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。
channel_desc:用来定义任务下的数据通道,可表示MySQL源表到doris目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足MySQL源表应该与doris目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。
FROM mysql_db.src_tbl INTO des_tbl
[partitions]
[columns_mapping]
column_mapping:主要指MySQL源表和doris目标表的列之间的映射关系,如果不指定,FE会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(比如增加一个nullable的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
binlog_desc:binlog_desc中的属性定义了对接远端Binlog地址的一些必要信息,目前可支持的对接类型只有canal方式,所有的配置项前都需要加上canal前缀。
FROM BINLOG
(
"key1" = "value1",
"key2" = "value2"
)
canal.server.ip: canal server的地址
canal.server.port: canal server的端口
canal.destination: 前文提到的 instance的字符串标识
canal.batchSize: 每批从canal server处获取的batch大小的最大值,默认8192
canal.username: instance的用户名
canal.password: instance的密码
canal.debug: 设置为true时,会将batch和每一行数据的详细信息都打印出来,会影响性能。
示例
创建同步作业:
CREATE SYNC test.job1
(
FROM test.tbl1 INTO binlog_test
)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "scentos",
"canal.server.port" = "11111",
"canal.destination" = "example",
"canal.username" = "root",
"canal.password" = "root"
);
查看作业状态,展示当前数据库的所有数据同步作业状态:
mysql> SHOW SYNC JOB;
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
| JobId | JobName | Type | State | Channel | Status | JobConfig | CreateTime | LastStartTime | LastStopTime | FinishTime | Msg |
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
| 11100 | job1 | CANAL | RUNNING | test.tbl1->binlog_test | position:N/A | address:scentos:11111,destination:example,batchSize:8192 | 2022-05-15 16:49:04 | 2022-05-15 16:49:05 | NULL | NULL | NULL |
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
1 row in set (0.02 sec)
展示数据库test下的所有数据同步作业状态:
mysql> SHOW SYNC JOB FROM `test`;
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
| JobId | JobName | Type | State | Channel | Status | JobConfig | CreateTime | LastStartTime | LastStopTime | FinishTime | Msg |
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
| 11100 | job1 | CANAL | RUNNING | test.tbl1->binlog_test | position:N/A | address:scentos:11111,destination:example,batchSize:8192 | 2022-05-15 16:49:04 | 2022-05-15 16:49:05 | NULL | NULL | NULL |
+-------+---------+-------+---------+------------------------+--------------+-------------------------------------------------------------+---------------------+---------------------+--------------+------------+------+
1 row in set (0.00 sec)
返回结果集的参数意义如下:
State:作业当前所处的阶段。作业状态之间的转换如下图所示:
作业提交之后状态为PENDING,由FE调度执行启动canal client后状态变成RUNNING,用户可以通过STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和恢复,操作后作业状态分别为 CANCELLED/PAUSED/RUNNING。
作业的最终阶段只有一个:CANCELLED,当作业状态变为CANCELLED后,将无法再次恢复。当作业发生了错误时,若错误是不可恢复的,状态会变成CANCELLED,否则会变成PAUSED。
Channel:作业所有源表到目标表的映射关系。
Status:当前binlog的消费位置(若设置了 GTID 模式,会显示GTID),以及doris端执行时间相 mysql端的延迟时间。
JobConfig:对接的远端服务器信息,如canal server的地址与连接 instance的destination。
往MySQL表继续插入数据,观察Doris表:
mysql> select * from binlog_test;
+------+------+
| a | b |
+------+------+
| 3 | 3 |
| 1 | 1 |
| 2 | 2 |
+------+------+
3 rows in set (0.07 sec)
控制作业
用户可以通过STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和恢复。
# 停止名称为`job_name`的数据同步作业
STOP SYNC JOB [db.]job_name
# 暂停名称为`job_name`的数据同步作业
PAUSE SYNC JOB [db.]job_name
# 恢复名称为`job_name`的数据同步作业
RESUME SYNC JOB `job_name
InsertInto
Insert Into语句的使用方式和MySQL等数据库中Insert Into语句的使用方式类似。但在Doris中,所有的数据写入都是一个独立的导入作业。所以这里将Insert Into也作为一种导入方式介绍。
分类
主要的 Insert Into 命令包含以下两种;
INSERT INTO tbl SELECT ...
INSERT INTO tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);
其中第二种命令仅用于Demo,不要使用在测试或生产环境中。Insert Into命令需要通过MySQL 协议提交,创建导入请求会同步返回导入结果。
基本语法
INSERT INTO table_name [partition_info] [WITH LABEL label] [col_list] [query_stmt] [VALUES];
WITH LABEL:INSERT操作作为一个导入任务,也可以指定一个label。如果不指定,则系统会自动指定一个UUID作为label。该功能需要 0.11+ 版本。
注意:建议指定Label而不是由系统自动分配。如果由系统自动分配,但在Insert Into语句执行过程中,因网络错误导致连接断开等,则无法得知Insert Into是否成功。而如果指定Label,则可以再次通过Label查看任务结果。
示例
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");
注意:
当需要使用CTE(Common Table Expressions) 作为insert操作中的查询部分时,必须指
定WITH LABEL和column list部分。示例:
INSERT INTO tbl1 WITH LABEL label1
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
INSERT INTO tbl1 (k1)
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
S3导入
参考官网:https://doris.apache.org/zh-CN/administrator-guide/load-data/s3-load-manual.html
数据导出
Export导出
数据导出是Doris提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据以文本的格式,通过Broker进程导出到远端存储上,如HDFS/BOS 等。
基本原理
用户提交一个Export作业后。Doris会统计这个作业涉及的所有Tablet。然后对这些Tablet进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的Tablet上的数据,然后通过 Broker将数据写到远端存储指定的路径中,也可以通过S3协议直接导出到支持S3协议的远端存储上。
调度方式
(1)用户提交一个Export作业到FE。
(2)FE的Export调度器会通过两阶段来执行一个Export作业:
PENDING:FE生成ExportPendingTask,向BE发送snapshot命令,对所有涉及到的Tablet 做一个快照。并生成多个查询计划。
EXPORTING:FE生成ExportExportingTask,开始执行查询计划。
查询计划拆分
Export作业会生成多个查询计划,每个查询计划负责扫描一部分Tablet。每个查询计划扫描的 Tablet个数由FE配置参数export_tablet_num_per_task
指定,默认为5。即假设一共100个Tablet,则会生成20个查询计划。用户也可以在提交作业时,通过作业属性tablet_num_per_task
指定这个数值。
查询计划执行
一个作业的多个查询计划顺序执行。一个查询计划扫描多个分片,将读取的数据以行的形式组织,每1024行为一个batch,调用Broker写入到远端存储上。查询计划遇到错误会整体自动重试3 次。如果一个查询计划重试3次依然失败,则整个作业失败。
Doris会首先在指定的远端存储的路径中,建立一个名为__doris_export_tmp_12345
的临时目录(其中12345为作业id)。导出的数据首先会写入这个临时目录。每个查询计划会生成一个文件,文件名示例:
export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
其中c69fcf2b6db5420f-a96b94c1ff8bccef为查询计划的query id。1561453713822为文件生成的时间戳。
当所有数据都导出后,Doris会将这些文件rename到用户指定的路径中。
基本语法
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);
label:本次导出作业的标识。后续可以使用这个标识查看作业状态。
column_separator:列分隔符。默认为\t。支持不可见字符,比如’\x07’。
columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。
line_delimiter:行分隔符。默认为\n。支持不可见字符,比如’\x07’。
exec_mem_limit:表示Export作业中,一个查询计划在单个BE上的内存使用限制。默认2GB。单位字节。
timeout:作业超时时间。默认2小时。单位秒。
tablet_num_per_task:每个查询计划分配的最大分片数。默认为5。
导出示例
首先,启动Hadoop,然后在doris中执行导出:
export table example_site_visit2
to "hdfs://scentos:8020/doris-export"
PROPERTIES
(
"label" = "mylabel",
"column_separator"="|",
"timeout" = "3600"
)
WITH BROKER "broker_name"
(
"dfs.nameservices"="mycluster",
"dfs.ha.namenodes.mycluster"="scentos",
"dfs.namenode.rpc-address.mycluster.scentos"= "scentos:8020",
"dfs.client.failover.proxy.provider.mycluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
最后查看导出结果:
查询导出作业状态
mysql> show export\G
*************************** 1. row ***************************
JobId: 11115
Label: mylabel
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":["*"],"exec mem limit":2147483648,"column separator":"|","line delimiter":"\n","tablet num":10,"broker":"broker_name","coord num":2,"db":"default_cluster:test","tbl":"example_site_visit2"}
Path: hdfs://scentos:8020/doris-export
CreateTime: 2022-05-15 18:36:41
StartTime: 2022-05-15 18:36:43
FinishTime: 2022-05-15 18:36:49
Timeout: 3600
ErrorMsg: NULL
1 row in set (0.01 sec)
JobId:作业的唯一ID
Label:自定义作业标识
State:作业状态:
状态值 | 值含义 |
---|---|
PENDING | 作业待调度 |
EXPORTING | 数据导出中 |
FINISHED | 作业成功 |
CANCELLED | 作业失败 |
Progress:作业进度。该进度以查询计划为单位。假设一共10个查询计划,当前已完成3个,则进度为30%。
TaskInfo:以 Json 格式展示的作业信息:
字段 | 含义 |
---|---|
db | 数据库名 |
tbl | 表名 |
partitions | 指定导出的分区。* 表示所有分区。 |
exec mem limit | 查询计划内存使用限制。单位字节。 |
column separator | 导出文件的列分隔符。 |
line delimiter | 导出文件的行分隔符。 |
tablet num | 涉及的总 Tablet 数量。 |
broker | 使用的 broker 的名称。 |
coord num | 查询计划的个数。 |
Path:远端存储上的导出路径。
CreateTime/StartTime/FinishTime:作业的创建时间、开始调度时间和结束时间。
Timeout:作业超时时间。单位是秒。该时间从CreateTime开始计算。
ErrorMsg:如果作业出现错误,这里会显示错误原因
注意事项
(1)不建议一次性导出大量数据。一个Export作业建议的导出数据量最大在几十GB。过大的导出会导致更多的垃圾文件和更高的重试成本。
(2)如果表数据量过大,建议按照分区导出。
(3)在Export作业运行过程中,如果FE发生重启或切主,则Export作业会失败,需要用户重新提交。
(4)如果 Export 作业运行失败,在远端存储中产生的__doris_export_tmp_xxx临时目录,以及已经生成的文件不会被删除,需要用户手动删除。
(5)如果Export作业运行成功,在远端存储中产生的__doris_export_tmp_xxx 目录,根据远端存储的文件系统语义,可能会保留,也可能会被清除。比如在百度对象存储(BOS)中,通过 rename操作将一个目录中的最后一个文件移走后,该目录也会被删除。如果该目录没有被清除,用户可以手动清除。
(6)当Export运行完成后(成功或失败),FE发生重启或切主,则SHOW EXPORT
展示的作业的部分信息会丢失,无法查看。
(7)Export作业只会导出Base 表的数据,不会导出Rollup Index的数据。
(8)Export作业会扫描数据,占用IO资源,可能会影响系统的查询延迟
查询结果导出
SELECT INTO OUTFILE
语句可以将查询结果导出到文件中。目前支持通过Broker进程, 通过S3协议, 或直接通过HDFS协议,导出到远端存储,如HDFS,S3,BOS,COS(腾讯云)上。
语法
query_stmt
INTO OUTFILE "file_path"
[format_as]
[properties]
file_path:指向文件存储的路径以及文件前缀,如hdfs://path/to/my_file_。最终的文件名将由my_file_,文件序号以及文件格式后缀组成。其中文件序号由0开始,数量为文件被分割的数量。如:
my_file_abcdefg_0.csv
my_file_abcdefg_1.csv
my_file_abcdegf_2.csv
[format_as]:FORMAT AS CSV,指定导出格式。默认为CSV。
[properties]:指定相关属性。目前支持通过Broker进程, 或通过S3协议进行导出。
Broker相关属性需加前缀broker。具体参阅Broker文档。HDFS相关属性需加前缀hdfs。其中hdfs.fs.defaultFS用于填写namenode地址和端口,属于必填项。S3协议则直接执行S3协议配置即可。
示例:
("broker.prop_key" = "broker.prop_val", ...)
or
("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")
or
("AWS_ENDPOINT" = "xxx", ...)
其他属性:
("key1" = "val1", "key2" = "val2", ...)
目前支持以下属性:
属性名 | 含义 |
---|---|
column_separator | 列分隔符,仅对CSV格式适用。默认为\t |
line_delimiter | 行分隔符,仅对CSV格式适用。默认为\n |
max_file_size | 单个文件的最大大小,默认为1GB,取值范围在5MB到2GB之间,超过这个大小的文件将会被切分 |
schema | PARQUET文件schema信息。仅对PARQUET格式适用。导出文件格式为PARQUET时,必须指定schema |
并发导出
并发导出的条件
默认情况下,查询结果集的导出是非并发的,也就是单点导出。如果用户希望查询结果集可以并发导出,需要满足以下条件:
(1)session variable 'enable_parallel_outfile'
开启并发导出:
set enable_parallel_outfile = true;
(2)导出方式为S3 , 或者HDFS,而不是使用broker
(3)查询可以满足并发导出的需求,比如顶层不包含sort 等单点节点。(后面会举例说明,哪种属于不可并发导出结果集的查询)
满足以上三个条件,就能触发并发导出查询结果集了,并发度 = be_instacne_num * parallel_fragment_exec_instance_num
验证结果集被并发导出。
用户通过session变量设置开启并发导出后,如果想验证当前查询是否能进行并发导出,则可以通过下面这个方法。
explain select xxx from xxx where xxx into outfile "s3://xxx"
format as csv properties ("AWS_ENDPOINT" = "xxx", ...);
对查询进行explain后,Doris会返回该查询的规划,如果发现RESULT FILE SINK
出现在PLAN FRAGMENT 1
中,就说明导出并发开启成功了。如果RESULT FILE SIN
出现在PLAN FRAGMENT 0
中,则说明当前查询不能进行并发导出(当前查询不同时满足并发导出的三个条件)。
使用示例
示例1:broker方式导出
SELECT * FROM example_site_visit
INTO OUTFILE "hdfs://scentos:8020/doris-out/broker_a_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
导出结果:
示例2:broker方式导出,且导出格式为parquet
SELECT city, age FROM example_site_visit
INTO OUTFILE "hdfs://scentos:8020/doris-out/broker_b_"
FORMAT AS PARQUET
PROPERTIES
(
"broker.name" = "broker_name",
"schema"="required,byte_array,city;required,int32,age"
);
导出到parquet文件时要明确指定schema,查看导出结果:
示例3:HDFS方式导出
SELECT * FROM example_site_visit
INTO OUTFILE "hdfs://doris-out/hdfs_"
FORMAT AS CSV
PROPERTIES
(
"hdfs.fs.defaultFS" = "hdfs://scentos:8020",
"hdfs.hdfs_user" = "root",
"column_separator" = ","
);
导出结果:
示例4:HDFS并发导出
set enable_parallel_outfile = true;
EXPLAIN SELECT * FROM example_site_visit
INTO OUTFILE "hdfs://doris-out/hdfs_"
FORMAT AS CSV
PROPERTIES
(
"hdfs.fs.defaultFS" = "hdfs://scentos:8020",
"hdfs.hdfs_user" = "root",
"column_separator" = ","
);
查看结果:
示例5:将CTE语句的查询结果导出:
WITH
x1 AS
(SELECT k1, k2 FROM tbl1),
x2 AS
(SELECT k3 FROM tbl2)
SELECT k1 FROM x1 UNION SELECT k3 FROM x2
INTO OUTFILE "hdfs://scentos:8020/doris-out/result_"
PROPERTIES
(
"broker.name" = "broker_name",
"broker.username"="root",
"broker.password"="root",
"broker.dfs.nameservices" = "my_ha",
"broker.dfs.ha.namenodes.my_ha" = "scentos",
"broker.dfs.namenode.rpc-address.my_ha.scentos" = "scentos:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
查看结果:
更多推荐
所有评论(0)