一、环境准备

Stream Load可以说是StarRocks最为核心的导入方式,StarRocks的主要导入方式例如Routine Load、Flink Connector、DataX StarRocksWriter等,底层实现都是基于Stream Load的思想,所以我们着重介绍。

Stream Load是由用户发送HTTP请求将本地文件或数据流导入至StarRocks中的导入方式,其本身不依赖其他组件。Stream Load支持导入本地数据文件(csv、txt等)和json文件,建议单次导入的数据文件不超过10G,编码格式要求为utf-8。

Stream Load作为一种同步导入方式,我们可以直接通过请求的返回值判断导入是否成功。也由于是同步的方式,我们无法手动取消Stream Load任务,Stream Load在超时或者导入错误后会被系统自动取消。

以下为本次测试使用的集群环境,StarRocks部署路径为/opt/module/starrocks,集群服务部署情况如下表:

集群节点

192.168.110.101

(node01)

192.168.110.102

(node02)

192.168.110.103

(node03)

部署服务

1 FE(Leader)

1 BE

1 Broker

1 MySQL-Client

1 FE(Observer)

1 BE

1 Broker

1 BE

1 Broker

备注:下文演示时使用root用户,密码也为root。导入使用的数据库名称为starrocks。

二、导入说明

Stream Load用于向指定的表导入数据,它可以保证每次导入任务的原子性(整批数据要么全部导入成功,要么全部失败),其完整语法为:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

其中:

-u参数后为我们导入数据时使用StarRocks的用户名和密码(格式为user:passwd,该用户需要拥有对应库表的LOAD_PRIV权限)。

-T后为我们需要导入的数据文件完整路径,当前仅支持访问“执行Stream Load命令的节点”上的本地文件。

-H后为HTTP的Header部分,用来传入导入参数,例如导入标签、行/列分隔符、导入条件等等。

为方便理解,我们先创建表car_status:

CREATE TABLE IF NOT EXISTS starrocks.`car_status` (

  `did` int(11) NOT NULL COMMENT "",

  `event_time` bigint(20) NOT NULL COMMENT "",

  `load_weight` int(10) NULL COMMENT "",

  `speed` float NULL COMMENT "",

  `voltage` float NULL COMMENT ""

)

DUPLICATE KEY(`did`, `event_time`)

DISTRIBUTED BY HASH(`did`) BUCKETS 10;

1、简单导入

假设在node01上有本地数据文件/opt/datafiles/car_status_1.csv,文件中有三行数据,数据顺序和car_status表中的顺序一致(其中的\N表示NULL):

10581,1537436416686,0,13.0,475.0

10581,1537436447655,0,25.0,495.0

10581,1537436475628,0,\N,465.0

Stream Load中默认的列分隔符为\t(即Tab键输入的大空格,当数据的列分隔符为\t时不要再显式的指定,因为http header会对\t特殊解释,直接写反而会报错,得写成ascii码),行分隔符为\n(即回车输入的换行符,由row_delimiter参数指定)。

观察数据文件,我们发现这里的列分隔符为英文逗号,行分隔符为默认的换行符。指定label为car_status_20211026001(label要求单数据库内唯一,默认保存3天),本次导入命令的写法为:

curl --location-trusted -u root:root -H "label:car_status_20211026001" -H "column_separator:," -T /opt/datafiles/car_status_1.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

在node01的命令窗口中回车执行上面的导入命令,导入完成后,会以json格式返回这次导入的相关内容,例如:

{

    "TxnId": 12597,

    "Label": "car_status_20211026001",

    "Status": "Success",

    "Message": "OK",

    "NumberTotalRows": 3,

    "NumberLoadedRows": 3,

    "NumberFilteredRows": 0,

    "NumberUnselectedRows": 0,

    "LoadBytes": 105,

    "LoadTimeMs": 50,

    "BeginTxnTimeMs": 0,

    "StreamLoadPutTimeMs": 1,

    "ReadDataTimeMs": 0,

    "WriteDataTimeMs": 24,

    "CommitAndPublishTimeMs": 23

}

我们主要关注以下几个参数:

Status:导入最后的状态,包括:

  • Success:表示导入成功,数据已经可见;
  • Publish Timeout:表述导入作业已经成功Commit,但是由于某种原因并不能立即可见。我们可以视作已经成功不必重试导入;
  • Label Already Exists:表明该Label已经被其他作业占用,可能是导入成功,也可能是正在导入;
  • Fail:此次导入失败,可以指定Label重试此次作业。

Message:导入状态详细的说明。失败时会返回具体的失败原因。

NumberTotalRows:从数据流中读取到的总行数。

NumberLoadedRows:此次导入的数据行数,只有在Success时有效。

NumberFilteredRows:此次导入过滤掉的行数,即数据质量不合格的行数。

NumberUnselectedRows:此次导入,通过where条件被过滤掉的行数。

LoadBytes:此次导入的源文件数据量大小。

LoadTimeMs:此次导入所用的时间。

ErrorURL: 被过滤数据的具体内容,仅保留前1000条。当导入失败时,我们可以curl这里返回的url来查看错误详情(使用curl获取错误详情时,这里的url需要用双引号引起来)。

在使用Stream Load导入时,我们强烈建议为每个任务设置一个唯一的辨识度高的label,这样就可以通过get label state命令查看对应label的导入情况,例如:

curl -u root:root http://192.168.110.101:8030/api/starrocks/get_load_state?label=car_status_20211026001

返回结果:

{"state":"VISIBLE","status":"OK","msg":"Success"}

2、数据转化

在导入本地数据文件时,可能会出现目标表中的字段与数据文件中的列不完全匹配的情况,比如数据文件中多几列、少几列、列顺序不一致或者需要生成衍生列等情况,这时我们可以在Stream Load命令中直接进行数据转换。还以表car_status为例,我们分情况展开:

  • 数据文件多几列

若我们需要导入的数据文件/opt/datafiles/car_status_2.csv中的数据多了一列status列(倒数第二列):

10582,1537436416686,0,17.0,正常,495.0

10582,1537436447865,0,22.0,正常,395.0

10582,1537436475203,0,19.0,异常,225.0

我们可以通过在-H中指定columns来进行列的过滤,Stream Load导入命令为:

curl --location-trusted -u root:root -H "label:car_status_20211026002" -H "column_separator:," -H "columns:did,event_time,load_weight,speed,status,voltage" -T /opt/datafiles/car_status_2.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

这里注意,在columns参数中我们指定的是数据文件中的列名及顺序。当columns参数中的列名与目标表中字段名相同,该列数据正常导入(比如参数中的did、event_time、load_weight、speed和voltage列,它们可以和car_status表对应,就会正常导入。columns参数中列的顺序可以和目标表的不一致,只需要字段名相同,目标表就可以正常获取数据)。当参数中的列在目标表中不存在,该列数据会在导入过程中被忽略掉(比如参数中的status列,其在car_status表中不存在,导入时就会被过滤掉)。

  • 数据文件中少几列(无法直接导入)

先说结论,当数据文件中的列较目标表中少时,我们没有办法直接导入。下面使用会报错的导入命令来演示出现问题后的排查步骤。

比如需要导入的数据文件/opt/datafiles/car_status_3.csv中的数据少了一列load_weight列:

10583,1537436416686,17.0,495.0

10583,1537436447865,22.0,395.0

10583,1537436475203,19.0,225.0

在Stream Load命令中,我们还将columns参数设置和数据文件中的列保持一致,那么导入命令我们写为:

curl --location-trusted -u root:root -H "label:car_status_20211026003" -H "column_separator:," -H "columns:did,event_time,speed,voltage" -T /opt/datafiles/car_status_3.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

执行,发现报错too many filtered rows:

{

    "TxnId": 12599,

    "Label": "car_status_20211026003",

    "Status": "Fail",

    "Message": "too many filtered rows",

    "NumberTotalRows": 3,

    "NumberLoadedRows": 0,

    "NumberFilteredRows": 3,

    "NumberUnselectedRows": 0,

    "LoadBytes": 94,

    "LoadTimeMs": 23,

    "BeginTxnTimeMs": 1,

    "StreamLoadPutTimeMs": 1,

    "ReadDataTimeMs": 0,

    "WriteDataTimeMs": 16,

    "CommitAndPublishTimeMs": 0,

    "ErrorURL": "http://192.168.110.101:8040/api/_load_error_log?file=__shard_0/error_log_insert_stmt_ba4b34ed-898f-2246-fe63-a1c3de38b99a_ba4b34ed898f2246_fe63a1c3de38b99a"

}

从Message中我们看不出具体的错误信息,这里我们就需要使用ErrorURL的信息:

curl http://192.168.110.101:8040/api/_load_error_log?file=__shard_0/error_log_insert_stmt_ba4b34ed-898f-2246-fe63-a1c3de38b99a_ba4b34ed898f2246_fe63a1c3de38b99a

得到信息示例如下:

Reason: column count mismatch, expect=5 real=4. src line: [10581,1537436416686,17.0,495.0];

此时我们就发现错误原因是列数不匹配,由于目标表car_status中有5个字段,所以导入时的预期列即为5列,而当前只设置导入4列数据,故导入失败。

总结来说:若目标表中存在某字段,但在columns参数中未指定,那么导入就会失败报错。

  • 生成衍生列

在上面我们提到,若数据文件较目标表缺少列,我们没有办法直接进行导入。还以car_status_3.csv文件为例,若我们确实需要将其导入表car_status中,并将缺少的load_weight列的值设为null,我们就需要使用衍生列的写法,比如:

curl --location-trusted -u root:root -H "label:car_status_20211026003" -H "column_separator:," -H "columns:did,event_time,speed,voltage,load_weight=null" -T /opt/datafiles/car_status_3.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

衍生列除了设置常量值,更多的会使用函数,对数据文件中已有的字段进行函数处理,从而得到衍生列的值。

这里咱们举一个业务中经常遇到的数据精度的例子。假设我们需要导入数据文件/opt/datafiles/car_status_4.csv,该文件中数据顺序和car_status表中的顺序一致,但第三列load_weight的数据精度较StarRocks表中的int类型高一些:

10584,1537436416686,11.0,13.0,475.0

10584,1537436447655,12.0,25.0,495.0

10584,1537436475628,11.0,1.0,465.0

观察数据,我们发现可以直接舍弃小数点后的0,那么就可以left()函数对数据文件中的字段进行转化,Stream Load语句如下:

curl --location-trusted -u root:root -H "label:car_status_20211026004" -H "column_separator:," -H "columns:did,event_time,tmp,speed,voltage,load_weight=left(tmp,2)" -T /opt/datafiles/car_status_4.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

通过上面的例子,我们发现使用函数对现有数据进行处理得到衍生列时,需要注意以下三点:

1、在写衍生列之前,需要先按照数据文件中列的顺序列出其所有列,需使用函数处理的列要先使用临时字段名占位,用以取值(比如例子中的tmp)。columns参数中列的顺序即为导入过程中从数据文件中的取值顺序;

2、列出衍生列,衍生列由前面的临时占位列通过函数处理得到,衍生列的列名需与目标表中的一致(前面提到过,当columns中字段名与目标表的列名匹配时,目标表才可以正常获取数据)。衍生列写法为col_name = func(tmp),不能使用col_name = func(col_name)的写法(即占位列的名称与衍生列的名称不能相同);

3、StarRocks自带的函数都支持在生成衍生列时使用,具体的函数可以参考官网文档中的“函数参考”部分。

  • 列顺序不一致

为充分使用排序键的优势,在使用StarRocks建表时我们通常会将整数型的列放在最前面,这就可能出现StarRocks中表的字段顺序与其他数据库导出的数据文件的字段顺序不一致的情况。

这种情况下,我们只需要保证Stream Load命令中columns参数指定的字段顺序与数据文件中的一致即可,不需要再做其他处理。

我们一直在强调,columns参数中字段的左右顺序即为导入过程中从数据文件中取值的左右顺序,所以这里必须要和数据文件中字段的顺序一致,不然取值就乱套了。目标表的列顺序和columns参数中的列顺序没有任何关系,只要columns参数中的字段名称与目标表中的列名称能匹配的上,就可以完成导入。这部分,我们一定要区分的清楚。

例如,我们从Vertica导出得到的数据文件为/opt/datafiles/car_status_5.csv,数据文件中字段名与目标表中一致,但是顺序不同。数据文件car_status_5.csv中字段顺序为:did、load_weight、speed、voltage、event_time,其内容为:

10585,0.0,13.0,475.0,1537436416686

10585,0.0,25.0,495.0,1537436447655

10585,0.0,1.0,465.0,1537436475628

则导入时的Stream Load命令就可以直接写为:

curl --location-trusted -u root:root -H "label:car_status_20211026005" -H "column_separator:," -H "columns:did,load_weight,speed,voltage,event_time" -T /opt/datafiles/car_status_5.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

  • 行的过滤

在使用Stream Load导入数据中,针对数据行的过滤有两类情况:

1、使用where参数设置筛选条件,仅导入符合条件的规范数据;

2、使用max_filter_ratio参数设置本次导入“最大容忍可过滤数据比例”,进而忽略掉一定比例的不规范数据,将剩余规范的数据进行导入。

针对第一类情况,我们举例说明。当前有数据文件/opt/datafiles/car_status_6.csv,其中的数据列与目标表中的一致,为:

10586,1537436416686,0,13.0,475.0

10586,1537436447655,0,25.0,495.0

10587,1537436475628,0,1.0,465.0

若我们仅需要导入did为10586的数据行,那么就可以使用where参数进行筛选,Stream Load命令如下:

curl --location-trusted -u root:root -H "label:car_status_20211026006" -H "column_separator:," -H "where:did=10586" -T /opt/datafiles/car_status_6.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

在返回值中,我们观察"NumberUnselectedRows": 1,即表示使用where参数过滤了1条数据。

针对存在不规范数据的第二类过滤情况,我们还列举一个生产中可能出现的例子,假设当前有数据文件/opt/datafiles/car_status_7.csv,其内有四行数据:

did,event_time,load_weight,speed,voltage

10587,1537436416686,0,13.0,475.0

10587,1537436447655,0,25.0,495.0

10587,1537436475628,0,1.0,465.0

我们发现,首行数据其实是数据的字段名,但是StarRocks是不会自动感知的,目前只会将其也当作普通数据处理。那么此时,首行数据就是一条不规范的数据。如果直接导入,由于数据格式与建表语句不匹配,那么这四条数据会整体导入失败(即原子性)。

我们先考虑是否可以通过where参数将其过滤掉,结论是针对car_status表是不行的,因为car_status表中所有字段的类型都是数值型,但首行的列名数据是文本型,没有办法进行隐式的转换(若表中存在字符型字段,我们就可以在Stream Load命令中通过-H "where: 列名 != '列名称'"将首行过滤掉)。

方案一行不通,那么我们就尝试通过max_filter_ratio参数设置容错率来实现导入,max_filter_ratio参数的取值范围是0~1,默认为0容忍。若希望将一批包含不规范内容的数据成功导入,那么容错比例的计算公式为:

max_filter_ratio≥质量不合格数据行数/原始文件的行数

以car_status_7.csv来计算:max_filter_ratio≥1/4=0.25

所以Stream Load命令可以写为:

curl --location-trusted -u root:root -H "label:car_status_20211026007" -H "column_separator:," -H "max_filter_ratio:0.25" -T /opt/datafiles/car_status_7.csv http://192.168.110.101:8030/api/starrocks/car_status/_stream_load

观察返回值:

{

    "TxnId": 12653,

    "Label": "car_status_20211026007",

    "Status": "Success",

    "Message": "OK",

    "NumberTotalRows": 4,

    "NumberLoadedRows": 3,

    "NumberFilteredRows": 1,

    "NumberUnselectedRows": 0,

    "LoadBytes": 140,

    "LoadTimeMs": 38,

    "BeginTxnTimeMs": 0,

    "StreamLoadPutTimeMs": 0,

    "ReadDataTimeMs": 0,

    "WriteDataTimeMs": 15,

    "CommitAndPublishTimeMs": 21,

    "ErrorURL": "http://192.168.110.102:8040/api/_load_error_log?file=__shard_2/error_log_insert_stmt_374c1b3e-ca98-33c1-52c8-4954143ae5a9_374c1b3eca9833c1_52c84954143ae5a9"

}

我们发现"NumberFilteredRows": 1,也即数据质量不合格而被过滤的行数为1。同时我们还发现此时虽然Status为Success,但ErrorURL中仍旧是有被过滤掉的不规范的数据信息的。通过url查看错误详情:

curl "http://192.168.110.102:8040/api/_load_error_log?file=__shard_2/error_log_insert_stmt_374c1b3e-ca98-33c1-52c8-4954143ae5a9_374c1b3eca9833c1_52c84954143ae5a9"

Reason: null value for not null column, column=did. src line: [];

Reason: null value for not null column, column=event_time. src line: [];

我们发现报错的原因是“向非空列导入了空值”。分析其原因,是我们创建表car_status时,为did列和event_time列设置了非空属性。而在导入过程中,数据文件car_status_7.csv首行内容从字符类型强制转换为数值型时,转换失败成为null值,null值在入库时与字段非空属性冲突,进而报错。

这里我们可以推测一下,若car_status表建表时我们没有为字段did和event_time设置非空属性,那么默认情况下,即便我们不设置容错率,本次导入也能够成功的,但导入的数据将会是“1行null值+3行数据”。

为了验证咱们的推测,我们创建临时表car_status_tmp,去除表car_status中did列和event_time列的非空属性:

CREATE TABLE IF NOT EXISTS starrocks.`car_status_tmp` (

  `did` int(11) NULL COMMENT "",

  `event_time` bigint(20) NULL COMMENT "",

  `load_weight` int(10) NULL COMMENT "",

  `speed` float NULL COMMENT "",

  `voltage` float NULL COMMENT ""

)

DUPLICATE KEY(`did`, `event_time`)

DISTRIBUTED BY HASH(`did`) BUCKETS 10;

还使用/opt/datafiles/car_status_7.csv数据文件,先在默认情况下直接导入:

curl --location-trusted -u root:root -H "label:car_status_tmp_20211026001" -H "column_separator:," -T /opt/datafiles/car_status_7.csv http://192.168.110.101:8030/api/starrocks/car_status_tmp/_stream_load

导入完成后执行查询:

mysql> select * from car_status_tmp;

+-----------+----------------------+---------------------+------------+-------------+

| did         | event_time         | load_weight      | speed     | voltage    |

+-----------+----------------------+---------------------+------------+-------------+

| NULL     |          NULL         |        NULL         |  NULL    |    NULL    |

| 10587    | 1537436416686 |           0             |    13       |     475      |

| 10587    | 1537436447655 |           0             |    25       |     495      |

| 10587    | 1537436475628 |           0             |     1        |     465      |

+-----------+----------------------+---------------------+------------+-------------+

结果符合预期,但是导入空值显然是不太严谨的。当目标表的字段没有非空属性时,我们有没有其他方式能够限制这类空值的导入?

查找官方文档,推测应该可以通过strict mode严格模式实现:开启严格模式,即可对“原始数据不为空值,在参与列类型转换后结果为空值”的情况进行限制。

在Stream Load中加入-H "strict_mode=true"开启严格模式,指定新的label,再次对文件/opt/datafiles/car_status_7.csv进行导入:

curl --location-trusted -u root:root -H "label:car_status_tmp_20211026002" -H "column_separator:," -H "strict_mode=true" -T /opt/datafiles/car_status_7.csv http://192.168.110.101:8030/api/starrocks/car_status_tmp/_stream_load

发现仍能导入成功,这里应该是不太严谨的,已向StarRocks提交issue,StarRocks测试同学回复在复测中,目前2.0.1版本中该问题应该还存在。

3、Json导入

Stream Load也支持导入本地的json文件。当json格式较为复杂时,我们也可以通过指定jsonpath实现精准导入。

我们再创建演示表sales:

CREATE TABLE IF NOT EXISTS sales (

    `category` varchar(512) NULL COMMENT "",

    `author` varchar(512) NULL COMMENT "",

    `title` varchar(512) NULL COMMENT "",

    `price` double NULL COMMENT ""

)

DISTRIBUTED BY HASH(category) BUCKETS 10;

  • 简单导入

假设现在有json文件/opt/datafiles/sales_1.json,其数据格式为:

[{"category":"C++","author":"avc","title":"C++ primer","price":89.5},

{"category":"Java","author":"avc","title":"Effective Java","price":95},

{"category":"Linux","author":"avc","title":"Linux kernel","price":195}]

不同于csv文件,在使用Stream Load导入json时,就没有分隔符的概念了。我们需要在Header中指定导入格式为json,因为sales_1.json中的数据是以数组开始,并且数组中每个对象是一条记录,这样我们就需要设置strip_outer_array属性成true,表示展平数组。导入命令如下:

curl --location-trusted -u root:root -H "label:sales_20211028001" -H "format:json" -H "strip_outer_array:true" -T /opt/datafiles/sales_1.json http://192.168.110.101:8030/api/starrocks/sales/_stream_load

  • 指定jsonpath精准导入

假设有一个相对复杂的json文件/opt/datafiles/sales_2.json,其内容为:

{

"RECORDS":[

{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},

{"category":"22","author":"2avc","price":895,"timestamp":1589191487},

{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}

]}

若只希望导入category、author、price三个属性,我们就可以指定json根节点和jsonpath。写法为:

curl --location-trusted -u root:root -H "columns:category,price,author" -H "label:sales_20211028002" -H "format:json" -H "jsonpaths:[\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array:true" -H "json_root:$.RECORDS" -T /opt/datafiles/sales_2.json http://192.168.110.101:8030/api/starrocks/sales/_stream_load

JsonPath表达式是提取给定JSON文档的部分内容的通用写法,这里的\用来给双引号转义,$表示根成员对象。

jsonpaths内值的名称需要与json文件中key的名称一致。和导入csv文件时相同,columns参数中字段的顺序需要和jsonpaths中的顺序一致,来确保取值正确。

最后再简单引申一点,对于带有嵌套结构的json,例如:

{"id": 100, "content": {"city": "beijing", "code" : 1}}

我们的jsonpath就可以写为:

-H "jsonpaths: [\"$.id\",\"$.content.city\",\"$.content.code\"]"

4、参数调整

当我们使用Stream Load导入较大的数据文件时,有两个主要配置需要注意:

1、最大可导入文件大小限制

Stream Load默认限制最大可导入的数据文件为10G。如果我们要导入的数据文件大小超过这个值,就需要调整BE配置文件be.conf中的streaming_load_max_mb,单位是MB,例如修改为16000M:

streaming_load_max_mb = 16000

该参数在配置文件中默认没有显式的列出。修改配置文件后,需要重启BE让配置生效。

2、导入任务超时时间

万兆网卡下,Stream Load的导入速度约为80M每秒,我们可以根据本地文件的大小估算出一个超时时间。

Stream Load导入任务在设定的timeout时间内未完成就会被系统自动取消,变成CANCELLED状态。默认的timeout时间为600秒,我们可以在Stream Load命令中设定单个命令的超时时间,例如:-H "timeout:1000"。

当需要导入的文件比较多时,若文件都比较大,我们也可以通过修改FE配置文件fe.conf中的stream_load_default_timeout_second(以秒为单位)参数,对全局的超时时间进行修改,例如修改为1500秒(这个参数在配置文件中默认没有显式列出,直接添加即可):

stream_load_default_timeout_second = 1500

注意:在单条Stream Load命令中设置超时时间的优先级是要高于FE配置文件的。

5、数据文件调整

  • 编码格式转换

StarRocks目前只支持读取UTF-8编码格式的本地数据文件,其他编码的文件在导入时可能会因为乱码出现问题,所以我们在导入前通常需要先确认文件编码格式并对其进行编码转换。

这里举一个比较有代表性的例子,我们在Windows系统上用记事本工具生成一个包含汉字内容的origin.txt文件,上传至服务器node01的/opt/datafiles/目录下,当我们使用less命令浏览时会发现乱码:

[root@node01 ~]# cd /opt/datafiles

[root@node01 datafiles]# less origin.txt

<D1><F6><CD><FB><D0>ǿգ<AC><BD><C5>̤ʵ<B5><D8>

查看其编码格式:

[root@node01 datafiles]# file --mime-encoding origin.txt

origin.txt: iso-8859-1

为其进行编码转换并生成UTF-8编码的新文件origin_utf-8.txt:

[root@node01 datafiles]# iconv -f iso-8859-1 -t utf-8 origin.txt > origin_utf-8.txt

正常来说生成的新文件应该是正常的,但是使用less查看新文件时仍然是乱码:

[root@node01 datafiles]# less origin_utf-8.txt

ÑöÍûÐÇ¿Õ£¬½Å̤ʵµØ

这里就有一个应急小技巧,我们可以将异常文件origin.txt的编码视为gbk,再次转码覆盖生成文件origin_utf-8.txt:

[root@node01 datafiles]# iconv -f gbk -t utf-8 origin.txt > origin_utf-8.txt

[root@node01 datafiles]# less origin_utf-8.txt

仰望星空,脚踏实地

[root@node01 datafiles]# file --mime-encoding origin_utf-8.txt

origin_utf-8.txt: utf-8

编码转换成功,文件可以正常显示。

这里咱们只是举了一个比较特殊的例子,通常来说,我们使用file --mime-encoding命令获取文件的编码后,直接使用iconv命令进行转换就是正常的。

  • 去除双引号

在使用工具将数据库数据导出至本地文件时,我们有时会忘记选择去掉文本或时间类型的双引号,而StarRocks在使用Stream Load导入数据时是不会区分双引号的,这样就会导致导入报错或者入库的数据中包含了双引号。

如果数据文件较大,我们编辑起来是比较艰难的,这种情况下我们可以使用如下命令将双引号替换为空:

sed -i 's/"//g' test.csv

完整语法:sed -i 's/原字符串/新字符串/g' /home/test.csv

  • 删除首行

针对数据文件首行是列名的情况,我们也可以通过删除首行内容来解决,删除命令为:

[root@node01 ~]# sed -i '1d' <file>

如果某个目录下的所有CSV数据文件首行都为列名,我们也可以批量删除,命令为:

[root@node01 datafiles]# find *.csv | xargs sed -i '1d'

  • 大文件拆分

当本地数据文件过大时,使用Stream Load导入会是一个相对耗时且重试成本较高的过程。这时我们就可以将大文件拆分为几个体积合适的小文件,通过同时起多个Stream Load导入任务来加快整体的导入速度(Stream Load的并发数不受集群大小影响)。大文件拆分的脚本示例如下:

#!/bin/bash

# split_csv.sh data.csv 10000

file_path=$1

line_number=$2

# /a/b.c.d => file_name: /a/b.c file_ext: .d

file_name=${file_path%.*}

file_ext="."${file_path##*.}

filter_cmd='sh -c "{ head -n1 '${file_path}'; cat; } > $FILE"'

echo $filter_cmd

tail -n +2 $file_path | split -d -l $line_number  -a 4  --additional-suffix $file_ext --filter="$filter_cmd"  -  ${file_name}_

例如将上面的脚本保存为split_csv.sh,然后执行:split_csv.sh data.csv 10000,就可以将data.csv按照每个文件10000行数据,分割为多个小文件,生成的小文件命名格式为:

data_0000.csv

data_0001.csv

data_0002.csv

data_0003.csv

data_0004.csv

...

6、REPLACE_IF_NOT_NULL

StarRocks目前还未支持Update语法,社区中不时有同学问起部分列导入该如何实现,这里就把这个聚合类型单独列出来。

REPLACE_IF_NOT_NULL是StarRocks中的一种聚合类型,其含义是当且仅当新导入数据是非NULL值时会发生替换行为,如果新导入的数据是NULL,那么StarRocks仍然会保留原值。该类型只对聚合模型表有用,其它模型不能指定这个。

注意:如果用在建表时REPLACE_IF_NOT_NULL列指定了NOT NULL,那么StarRocks仍然会将其转化NULL,不会向我们报错。我们可以借助这个聚合类型实现业务中“部分列导入”的效果。

还举两个简单的例子:

1)CSV示例:

CREATE TABLE IF NOT EXISTS starrocks.`car_status_rep` (

  `did` int(11) NOT NULL COMMENT "",

  `event_time` bigint(20) NOT NULL COMMENT "",

  `load_weight` int(10) REPLACE_IF_NOT_NULL NULL COMMENT "",

  `speed` float REPLACE_IF_NOT_NULL NULL COMMENT "",

  `voltage` float REPLACE_IF_NOT_NULL NULL COMMENT ""

)

DISTRIBUTED BY HASH(`did`) BUCKETS 1

PROPERTIES (

"replication_num" = "1"

);

准备CSV数据文件car_status_rep1.csv:

10581,1537436416686,0,13.0,475.0

10581,1537436447655,0,25.0,495.0

10581,1537436475628,0,\N,465.0

执行导入:

curl --location-trusted -u root:root -H "label:car_status_20211026001" -H "column_separator:," -T /opt/module/datafiles/car_status_rep1.csv http://192.168.110.101:8030/api/starrocks/car_status_rep/_stream_load

再准备car_status_rep2.csv

10581,1537436447655,\N,\N,500.0

再次导入:

curl --location-trusted -u root:root -H "label:car_status_20211026002" -H "column_separator:," -T /opt/module/datafiles/car_status_rep2.csv http://192.168.110.101:8030/api/starrocks/car_status_rep/_stream_load

观察表中数据,确认已实现部分列更新:

mysql> select * from car_status_rep;

+-------------+------------------------+---------------------+-----------+---------------+

| did           | event_time            | load_weight      | speed   | voltage       |

+-------------+------------------------+---------------------+-----------+---------------+

| 10581      | 1537436416686   |           0              |    13      |     475        |

| 10581      | 1537436447655   |           0              |    25      |     500        |

| 10581      | 1537436475628   |           0              |  NULL   |     465        |

+-------------+------------------------+---------------------+-----------+---------------+

2)Json示例:

CREATE TABLE IF NOT EXISTS sales_rep (

    `category` varchar(512) NULL COMMENT "",

    `author` varchar(512) NULL COMMENT "",

    `title` varchar(512) REPLACE_IF_NOT_NULL NULL COMMENT "",

    `price` double REPLACE_IF_NOT_NULL NULL COMMENT ""

)

DISTRIBUTED BY HASH(category) BUCKETS 1

PROPERTIES (

"replication_num" = "1"

);

准备Json数据文件sales_1.json:

[{"category":"C++","author":"avc","title":"C++ primer","price":89.5},

{"category":"Java","author":"avc","title":"Effective Java","price":95},

{"category":"Linux","author":"avc","title":"Linux kernel","price":195}]

执行导入:

curl --location-trusted -u root:root -H "label:sales_20211028001" -H "format:json" -H "strip_outer_array:true" -T /opt/module/datafiles/sales_1.json http://192.168.110.101:8030/api/starrocks/sales_rep/_stream_load

再次准备Json数据文件sales_2.json

[{"category":"C++","author":"avc","title":null,"price":100}]

执行导入,实现部分列更新:

curl --location-trusted -u root:root -H "label:sales_20211028002" -H "format:json" -H "strip_outer_array:true" -T /opt/module/datafiles/sales_2.json http://192.168.110.101:8030/api/starrocks/sales_rep/_stream_load

观察表中数据:

mysql> select * from sales_rep;

+----------------+------------+--------------------+----------+

| category      | author    | title                   | price    |

+----------------+------------+--------------------+----------+

| C++             | avc         | C++ primer      |   100    |

| Java            | avc         | Effective Java  |    95     |

| Linux           | avc         | Linux kernel     |   195    |

+----------------+------------+--------------------+----------+

7、JAVA开发Stream Load

这应该是我们业务中最常用的导入方式,在3.1章讲insert时已经介绍过,这里再给出一个简单的Java Demo(百度云地址见评论区)。不但是Java语言,在熟悉了Stream Load的原理后,我们也可以使用Python或者Go来基于Stream Load的思想来友好的进行数据导入。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐