Routine Load(例行导入)是StarRocks自带的一种可以从Kafka中持续不断的导入数据的方式,我们可以方便的在StarRocks中通过SQL来控制导入任务的暂停、继续及停止。

关于Routine Load,StarRocks官方文档描述的非常详细,这里仅结合自己的一些实践和理解简单再梳理一下。官网相关文档地址:

Routine Load @ RoutineLoad @ StarRocks Docshttps://docs.starrocks.com/zh-cn/main/loading/RoutineLoad

ROUTINE LOAD @ ROUTINE LOAD @ StarRocks Docshttps://docs.starrocks.com/zh-cn/main/sql-reference/sql-statements/data-manipulation/ROUTINE%20LOAD

介绍 @ Json_loading @ StarRocks Docshttps://docs.starrocks.com/zh-cn/main/loading/Json_loading

一、执行流程

首先,从导入执行的流程来说,当我们创建一个Routine Load任务后,该任务会被JobScheduler拆分成若干个Task,每个Task被TaskScheduler分配到指定的BE上执行。在BE上,一个Task被视为一个普通的导入任务, 通过Stream Load的导入机制进行导入。

为了方便理解,我们可以将上面的执行流程直观的理解为一个个不断被调度执行的Stream Load任务。在默认参数下,一个Routine Load任务会被拆分为若干个Task,Task被调度后,开始对Kafka进行为期3秒①的数据消费并在内存中攒批,3秒过后这批数据通过Stream Load的方式导入对应数据表中,任务完成后向FE汇报,然后间隔10秒②后Task被再次调度,如此循环进行。从Task被调度到本次Stream Load任务完成,整个过程的超时时间默认限制为15FE收到任务汇报结果后,会继续生成后续新的Task,或者对失败的Task进行重试(最多重试3次,都失败则任务暂停)。整个Routine Load作业通过不断的产生新的Task,来完成数据不间断的导入。

上面提到了三个时间参数都可以结合业务情况来修改:

①:数据消费最长时间,通过fe.conf中的routine_load_task_consume_second参数设置,默认为3s。生产建议设置为5秒或10秒。

②:子任务调度周期,在Routine Load语句中设置,参数为max_batch_interval,默认为10秒。生产建议设置为15秒或30秒。缩短任务调度周期可以加速数据消费,但是更小的任务调度周期也可能会带来更多的CPU资源消耗,还可能会导致Compaction的问题。

③:任务的执行超时时间,由fe.conf中的routine_load_task_timeout_second控制,默认为15s。生产建议设置为25秒或50秒。

其中①和③,在修改fe.conf后需要重启FE才会生效。在生产环境中,我们可以同时使用ADMIN SET语句临时修改配置,让配置即时生效:

mysql> ADMIN SET FRONTEND CONFIG ("key" = "value");

这种方式可以立刻生效(重启后失效),但注意,ADMIN SET的方式只会对当前客户端访问的FE节点生效,我们需要用客户端连接不同的FE节点,都执行一下。

关于Routine Load的主要参数配置,咱们在文章末尾会集中整理,这里先不展开。

二、导入示例

大致了解执行流程后,我们还搭建环境举几个简单的例子。环境说明:

节点IP

部署服务

端口

版本

说明

192.168.110.101

[node01]

FE

9030

2.0.0-GA

query_port

BE

用户名密码均为root

Broker

Broker名称:hdfs_broker

mysql-client

5.7.36

Zookeeper

2181

Zookeeper version: 3.4.13

Kafka

9092

kafka_2.13-2.8.1

认证

使用mysql-client访问StarRocks:

[root@node01 ~]# mysql -h192.168.110.101 -P9030 -uroot -proot

2.1 CSV文本格式数据

为了方便,这里首先还使用Stream Load章节的starrocks.car_status表来演示:

mysql> use starrocks;

mysql> CREATE TABLE IF NOT EXISTS starrocks.`car_status` (

  `did` int(11) NOT NULL COMMENT "",

  `event_time` bigint(20) NOT NULL COMMENT "",

  `load_weight` int(10) NULL COMMENT "",

  `speed` float NULL COMMENT "",

  `voltage` float NULL COMMENT ""

)

DUPLICATE KEY(`did`, `event_time`)

DISTRIBUTED BY HASH(`did`) BUCKETS 10

PROPERTIES (

"replication_num" = "1"

);

在Kafka中创建主题starrocks_topic,例如:

kafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic starrocks_topic

准备九条用来导入的数据,我们创建生产者将其生产至Kafka的starrocks_topic主题:

kafka-console-producer.sh  --broker-list  192.168.110.101:9092  --topic starrocks_topic

粘贴输入:

10581,1537436416680,0,13.0,475.0

10581,1537436447651,0,25.0,495.1

10581,1537436475622,0,\N,465.2

10581,1537436416683,0,13.0,475.3

10581,1537436447654,0,25.0,495.4

10581,1537436475625,0,\N,465.5

10581,1537436416686,0,13.0,475.6

10581,1537436447657,0,25.0,495.7

10581,1537436475628,0,\N,465.8

在StarRocks中创建Routine Load任务:

mysql> CREATE ROUTINE LOAD starrocks.routine_load_010901 ON car_status

COLUMNS TERMINATED BY ",",

COLUMNS (did, event_time, load_weight, speed, voltage)

PROPERTIES

(

"desired_concurrent_number"="1",

"max_error_number"="1000",

"max_batch_interval"="15"

)

FROM KAFKA

(

    "kafka_broker_list"= " 192.168.110.101:9092",

"kafka_topic" = "starrocks_topic",

"kafka_partitions" = "0",

"kafka_offsets" = "OFFSET_BEGINNING",

"property.group.id"="kafka_to_starrocks"

);

2.1.1 任务参数

Routine Load任务的各个参数在官网都有详细的介绍,这里单列出几个强调一下:

1PROPERTIES部分

1)desired_concurrent_number

desired_concurrent_number用来设置我们期望的并发度。一个例行导入作业会被分成多个子任务(Task)执行,这个参数指定一个作业最多有多少任务可以同时执行,必须大于0,默认为3。这个并发度并不是实际的并发度,实际的并发度取值规则为:

min(kafka_topic_partition_num, desired_concurrent_number, alive_backend_num, max_routine_load_task_concurrent_num)

即会取“需消费Kafka主题的分区数”、“期望并发度”、“集群中当前可用的BE数”和“fe.conf中max_routine_load_task_concurrent_num”四者的最小值。

2)max_batch_interval

max_batch_interval是子任务的调度时间,即任务多久执行一次,单位是秒,范围为5到60,默认为10秒。

3)max_batch_rows

max_batch_rows用来定义错误检测窗口范围,窗口的范围是10 * max-batch-rows。max_batch_rows的默认值为200000,通常默认即可。

因为例行导入任务是面向Kafka数据流的,由于数据流的无边界性,我们没有办法像其他导入任务一样,通过一个错误比例来计算错误率。所以这里提供了一种新的计算方式来计算数据流中的错误比例。即在Routine Load任务中,我们设定一个采样窗口,窗口的大小为 max_batch_rows * 10。在一个采样窗口内,如果错误行数超过 max_error_number,则作业被暂停。如果没有超过,则下一个窗口重新开始计算错误行数。

举例来说,默认的max_batch_rows为200000,则窗口大小为2000000。设max_error_number为20000,即业务预期每2000000行的错误行为20000,也即错误率为1%。注意,因为不是每批次任务正好消费200000行,所以这里的窗口会允许10%的统计误差,即窗口实际范围是[2000000, 2200000]。

4)max_error_number

max_error_number即为在上文提到的采样窗口内允许的最大错误行数,该参数必须大于等于0,默认是 0,即不允许有错误行。注意:被where条件过滤掉的行不算错误行。

2FROM KAFKA部分

1)kafka_partitions与kafka_offsets

指定需要订阅的kafka partition,以及对应的每个partition的起始offset。offset可以指定从大于等于0的具体 offset,或者:

a)OFFSET_BEGINNING: 从有数据的位置开始订阅;

b)OFFSET_END: 从末尾开始订阅。

如果没有指定,则默认从OFFSET_END开始订阅topic下的所有partition。例如:

"kafka_broker_list" = "broker1:9092,broker2:9092",

"kafka_partitions" = "0,1,2,3",

"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"

2)property.group.id

Routine Load中的property用来自定义kafka参数,功能等同于kafka shell中 "--property" 参数。

property.group.id用来指定消费者组,区别于Kafka中的消费者组概念,这里指定的消费者组仅用于在Kafka中生成对应的消费者组信息。StarRocks的每个Routine Load任务都在内部记录了自己的offset,来确保任务中每个offset的数据只导入一次(在消费过程中,StarRocks也会把消费到的offset提交到消费组里来更新Kafka的offset)。因为offset是由StarRocks自己管理,且每个任务管理自己的offset,那么对于消费同一主题不同的Routine Load任务,即便我们设为同一个消费者组,StarRocks也是会按照各自任务中指定的分区及offset信息重新消费。

建议为每个Routine Load任务设置各自的消费者组,以便在多次消费时复用连接,避免在Kafka中随机生成较多的消费者组信息。

注意:StarRocks并不能分辨Kafka中的数据是否是上游系统重试的数据,若要实现Exactly-Once,还需要在上游系统中保证数据不会重复。

2.1.2 任务指标

Routine Load任务创建后常驻后台,我们可以使用SHOW语句查看任务情况:

mysql> SHOW ROUTINE LOAD FOR routine_load_010901\G

*************************** 1. row ***************************

                  Id: 11026

                Name: routine_load_010901

          CreateTime: 2022-01-09 10:47:03

           PauseTime: NULL

             EndTime: NULL

              DbName: default_cluster:starrocks

           TableName: car_status

               State: RUNNING

      DataSourceType: KAFKA

      CurrentTaskNum: 1

 JobProperties: {"partitions":"*","rowDelimiter":"\t","columnToColumnExpr":"did,event_time,load_weight,speed,voltage","maxBatchIntervalS":"15","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"1","maxErrorNum":"1000","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}

DataSourceProperties: {"topic":"starrocks_topic","currentKafkaPartitions":"0","brokerList":"192.168.110.101:9092"}

    CustomProperties: {"group.id":"kafka_to_starrocks"}

           Statistic: {"receivedBytes":282,"errorRows":0,"committedTaskNum":1,"loadedRows":9,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":9,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3154}

            Progress: {"0":"8"}

ReasonOfStateChanged:

        ErrorLogUrls:

            OtherMsg:

主要强调三个指标:

State:为RUNNING即表示任务正常进行中,在任务出现问题时,状态为变为PAUSED。

ErrorLogUrls:当任务出现错误时,可以根据这里的信息排查错误原因。

Progress:对于Kafka数据源,显示该主题下当前任务在每个分区的消费进度,并将该进度推给Kafka作为当前消费者组的offset。如 {"0":"8"} 表示Kafka分区0已消费数据位移为8。

目前这里存在一个定义上的差异问题。当前Progress的定义是已消费数据的位移,例如9条数据消费完,Progress会从OFFSET_ZERO开始变为8,即位移是指向了已消费的最后一条记录

但是,按照Kafka Offset的定义,9条数据消费完,实际的offset应为9,即应指向下一条要消费的记录

而在当前的设计下,Routine Load任务会把Progress视为Offset推给Kafka,所以9条数据消费完后,我们在Kafka中查看Offset,会发现Offset也是8,而非Kafka自己理论上的9:

kafka-consumer-groups.sh --describe --bootstrap-server 192.168.110.101:9092 --group kafka_to_starrocks

CURRENT-OFFSET与Routine Load任务中的Progress会保持一致,但是因为上面说的定义上的差异,CURRENT-OFFSET比Kafka自己定义上的OFFSET会少1,从而导致上面的示例中LAG也错误的显示还剩余1条数据未被消费。

两边的定义存在差异是不太友好的,这里已经提了issue,研发老大们回复说后续这里会与Kafka中的定义对齐,待发版后我也会同步修改文档。

目前的逻辑整体上也没什么问题,但会有一个重复消费的风险点。假设我们因为某些原因Stop掉原有的Routine Load任务(比如为了修改一些任务参数),修改后我们肯定得指定原本消费的offset重新创建任务继续消费,上面解释过了,我们查到的offset是少了1的,所以新建任务语句时我们指定的offset就需要比用上面命令查到的+1,这才是Kafka中正确的offset,不然就会导致最后那条数据被重复消费一次。有点绕了,那就还以上面的例子解释一下,9掉数据消费完,Progress为8,推给Kafka后从Kafka里查到的offset自然也是不太友好的8。但是Routine Load任务语句中的kafka_offsets指定的偏移量是Kafka定义上的offset,前面9条数据消费完后,实际上这里的Offset得是9,这样才能继续正确消费第10条数据,如果我们在Routine Load创建语句的kafka_offsets参数中直接指定上面查到的8,那Kafka就会把第9条数据再次消费一次。

2.1.3 任务控制语法

在查看当前任务情况时,有几个查看语句也简单介绍一下:

mysql> SHOW ROUTINE LOAD;

完整语法为:

mysql> SHOW ROUTINE LOAD FOR [database.][job_name];

可以查看当前数据库中所有正在进行的Routine Load任务,已结束的任务无法查看(暂停状态的可以)。

mysql> SHOW ALL ROUTINE LOAD;

查看当前数据库中所有例行导入任务,包括已停止或取消的任务。

StarRocks中没有删除Routine Load任务的命令,在执行stop语句停止任务后,该任务还能通过SHOW ALL ROUTINE LOAD语句查看。这些已经终止的任务默认会保留三天(fe.conf中的label_keep_max_second参数),然后被定时任务清理掉。

StarRocks中还内置了一个简单的proc系统,我们也可以使用show proc命令来查看Routine Load任务情况:

mysql> show proc "/routine_loads/routine_load_010901"\G

[在stop的任务被定时任务清理前,该命令可以查看使用该任务名称的所有任务]

mysql> SHOW ROUTINE LOAD FOR routine_load_010901\G

[只能查看当前正在运行的任务]

查看任务及子任务运行状态:

mysql> show proc "/routine_loads/routine_load_010901/11026"\G

[11026为使用楼上的语句查看到的Routine Load任务的Id]

=

mysql> show routine load task where jobname = "routine_load_010901";

暂停Routine Load任务(任务可以被重新执行):

mysql> PAUSE ROUTINE LOAD FOR routine_load_010901;

执行后任务状态为:State: PAUSED

恢复暂停状态的Routine Load任务:

mysql> RESUME ROUTINE LOAD FOR routine_load_010901;

终止Routine Load任务(任务无法恢复):

mysql> STOP ROUTINE LOAD FOR routine_load_010901;

执行后任务状态为:State: STOPPED,该任务无法被恢复,会等待定时任务将其清理。

2.2 Json文本格式

前面咱们演示了导入Kafka中默认的CSV文本格式的数据,StarRocks要求CSV数据每一个Message为一行,且行尾不包含换行符。除此之外,StarRocks也支持从Kafka中导入Json文本格式的数据,我们也来举个例子。

在starrocks库中再创建一个演示表sales:

mysql> CREATE TABLE IF NOT EXISTS sales (

    `category` varchar(512) NULL COMMENT "",

    `author` varchar(512) NULL COMMENT "",

    `title` varchar(512) NULL COMMENT "",

    `price` double NULL COMMENT ""

)

DISTRIBUTED BY HASH(category) BUCKETS 10

PROPERTIES (

"replication_num" = "1"

);

在Kafka中创建主题starrocks_topic_json,例如:

kafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic starrocks_topic_json

准备用来导入的json数据,我们创建生产者将其生产至Kafka的starrocks_topic_json主题:

kafka-console-producer.sh  --broker-list  192.168.110.101:9092  --topic starrocks_topic_json

粘贴输入:

{"RECORDS":[{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},{"category":"22","author":"2avc","price":895,"timestamp":1589191487},{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}]}

在StarRocks中创建Routine Load任务:

mysql> CREATE ROUTINE LOAD starrocks.routine_load_010902 ON sales

COLUMNS (category, author, title, price)

PROPERTIES

(

"desired_concurrent_number"="1",

"format" = "json",

"jsonpaths" = "[\"$.category\",\"$.price\",\"$.author\"]",

"json_root"="$.RECORDS",

"strip_outer_array" = "true"

)

FROM KAFKA

(

    "kafka_broker_list"= "192.168.110.101:9092",

"kafka_topic" = "starrocks_topic_json",

"kafka_partitions" = "0",

"kafka_offsets" = "OFFSET_BEGINNING",

"property.group.id"="kafka_to_starrocks_json"

);

在对Json数据导入时,注意根据json情况指定format、jsonpaths与json_root(若有),并配置"strip_outer_array" = "true"将其展开为组数。这里也注意,jsonpaths部分是大小写敏感的。其他部分同CSV导入,官网文档也有多个完整的示例,就不再赘述了。

三、集群参数

在文档开头部分,我们已经介绍了几个时间的参数。当同时进行的Routine Load任务较多时,我们主要留意调整这两个参数:fe.conf的max_routine_load_task_num_per_be和be.conf中的routine_load_thread_pool_size。

这两个参数,可以粗估所有Routine Load任务总共拆分为了多少task,然后平均分到每个be上,设置一个值,例如都设为50。

Routine Load作为业务中使用度非常高的导入方式,这里将涉及的主要参数也集中整理了一下,同时也包括了几个导入任务通用的参数,如下表:

配置项

默认值

说明

max_routine_load_job_num

100

FE配置项,最大的Routine Load作业数,包括 NEED_SCHEDULED, RUNNING, PAUSE这些状态。超过后,不能在提交新的作业。

max_routine_load_task_concurrent_num

5

FE配置项,每个Routine Load作业最大并发执行的task数

max_routine_load_task_num_per_be

5

FE配置项,每个BE最大并发执行的Routine Load task数,需要小于等于BE的配置routine_load_thread_pool_size。

max_routine_load_batch_size

524288000

FE配置项,每个routine load task导入的最大数据量,默认500M

routine_load_task_consume_second

3

FE配置项,每个routine load task消费数据的最大时间,默认为3s

routine_load_task_timeout_second

15

FE配置项,每个routine load task超时时间,默认15s

max_tolerable_backend_down_num

0

FE配置项,如果故障的BE节点数超过该阈值,则不能自动恢复Routine Load作业。

period_of_auto_resume_min

5

FE配置项,默认是5分钟。StarRocks重新调度,只会在5分钟这个周期内,最多尝试3次。如果3次都失败则锁定当前任务,后续不在进行调度。但可通过人为干预,进行手动恢复。

max_running_txn_num_per_db

100

FE配置项,每个数据库并发导入的任务数,一般不建议调大。

max_consumer_num_per_group

3

BE配置项,默认为3。该参数表示一个子任务中最多生成几个consumer进行数据消费。对于Kafka数据源,一个consumer可能消费一个或多个kafka partition。假设一个任务需要消费6个kafka partition,则会生成3个consumer,每个consumer消费2个partition。如果只有2个partition,则只会生成2个consumer,每个consumer消费1个partition。

routine_load_thread_pool_size

10

BE配置项,Routine Load的线程池数目。

push_write_mbytes_per_sec

10

BE配置项。BE上单个Tablet的写入速度限制。默认是10,即10MB/s。根据Schema以及系统的不同,通常BE对单个Tablet的最大写入速度大约在10-30MB/s之间。可以适当调整这个参数来控制导入速度。

inc_rowset_expired_sec

1800

BE配置项。过期版本回收时间。导入生效的数据,存储引擎保留的时间,用于增量克隆。当磁盘空间还剩余较大但却出现报错disk on backend 10003 exceed limit usage时,可以适当调小,比如调整为600,同时尽快给磁盘扩容,建议磁盘利用率控制在50%以内。

备注:上表中所有的FE配置项都可以在运行时修改。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐