1. 基本原理

         +---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+

Client向FE提交一个Routine Load作业:

  1. FE通过JobScheduler将一个导入作业拆分成若干个Task。每个Task负责导入指定的一部分数据。Task被TaskScheduler分配到指定的BE上执行
  2. 在BE上,一个Task被视为一个普通的导入任务,通过Stream Load的导入机制进行导入。导入完成后,向FE汇报导入情况
  3. FE中的JobScheduler根据汇报结果,如果Task失败,则进行重试;如果Task成功,则继续生成后续新的Task
  4. 整个Routine Load作业通过不断的产生新的Task,来完成数据不间断的导入

2. 导入Kafka数据

2.1 创建Doris表

mysql> create table person(
    -> id bigint,
    -> name varchar(32),
    -> age int
    -> ) distributed by hash(age) buckets 5
    -> properties ('replication_num' = '1');
Query OK, 0 rows affected (0.11 sec)

mysql>

2.2 向Kafka发送数据

Kafka支持两种格式:
csv格式:csv每一个message为一行数据,且message行尾不包含换行符

json格式,json有两种格式:
格式一:message为json对象,对应Doris一行数据

{"id":1, "name":"name_1"}

格式二:message为json数组,对应Doris多行数据

[{"id":1, "name":"name_1"}, {"id":2, "name":"name_2"}, {"id":3, "name":"name_3"}]

这里我们向Kafka发送格式二的消息

[root@doris1 kafka_2.13-3.2.1]# bin/kafka-console-producer.sh --topic person_topic --bootstrap-server doris1:9092,doris2:9092,doris3:9092
>[{"id":1, "name":"name_1"}, {"id":2, "name":"name_2"}, {"id":3, "name":"name_3"}]
>

2.3 创建Doris导入作业

mysql> create routine load test_db.kafka_routine_load_label on person
-- columns terminated by ',',
columns(id, name, age = id + 10),	-- 衍生列,支持通过expr计算得出
where id > 1
properties (
'desired_concurrent_number' = '3',
'max_batch_interval' = '20',
'max_batch_rows' = '300000',
'max_batch_size' = '209715200',
'format' = 'json',
'jsonpaths' = '["$.id", "$.name"]',
'strip_outer_array' = 'true'
) from kafka (
"kafka_broker_list" = "192.168.23.61:9092,192.168.23.62:9092,192.168.23.63:9092",
"kafka_topic" = "person_topic",
"property.group.id" = "person_topic_group",
"property.client.id" = "person_topic_client",
-- "property.kafka_default_offsets" = "OFFSET_BEGINNING",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
Query OK, 0 rows affected (0.43 sec)

mysql>

说明:

  • 导入作业名称为:kafka_routine_load_label
  • terminated by:csv格式时,数据的分隔符,默认为’\t’
  • where:对映射列和衍生列进行过滤的SQL
  • desired_concurrent_number:默认值为3,但最终的结果值 = Min(kafka partition num, desired_concurrent_number, alive_backend_num, PE配置max_routine_load_task_concurrrent_num)
  • format默认是csv。还支持json
  • strip_outer_array:当kafka的消息是列表时,对列表中的多条数据进行打平
  • property.kafka_default_offsets:指定Kafka的默认消费位置(默认是offset_end)为offset_beginning。也可以指定每个partition的offset。也可以指定每个partition从指定时间戳开始消费,格式为yyyy-MM-dd HH:mm:ss
  • 指定消费Kafka的分区,默认消费所有分区

2.4 查看作业和任务状态、结果数据

查看作业状态的命令

mysql> show routine load for test_db.kafka_routine_load_label;

查看任务运行状态的命令。只能查看当前正在运行中的任务,已结束和未开始的任务无法查看

mysql> show routine load task where JobName = "kafka_routine_load_label";

查看person表数据

mysql> select * from person;
+------+--------+------+
| id   | name   | age  |
+------+--------+------+
|    2 | name_2 |   12 |
|    3 | name_3 |   13 |
+------+--------+------+
2 rows in set (0.08 sec)

mysql> 

2.7 修改作业属性和控制作业启停

修改已经创建的作业

help alter routine load;

作业控制

  • 停止作业:stop routine load for kafka_routine_load_label;。FE会自动定期清理stop状态的toutine load
  • 暂停作业:help pause routine load;
  • 重启作业:help resume routine load;

3. Doris数据库表等操作对routine load的影响

routine load导入作业和schema change操作的关系
如果schema change完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在routine load导入作业中显式指定列映射关系,以及通过增加Nullable列或带Default值的列来减少这类问题

routine load导入作业和partition删除的关系
如果routine load正在向一个partition导入数据,则不能进行该partition的删除,需等向该partition导入数据的任务完成活暂停routine load,才能删除该partition

删除表的Partition可能会导致后续导入任务无法找到对应的Partition,作业进入暂停

routine load导入作业和drop database/table操作的关系

当routine load导入对应的database或table被删除后,作业会自动cancel

routine load和kafka topic的关系

如果kafka集群的broker设置了auto.create.topics.enable = false, 则topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为pause

4. 作业属性设置

max_routine_load_task_concurrent_num:FE配置项,默认为5,可以运行时修改。该参数限制了一个routine load导入作业最大的子任务并发数。建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源

max_routine_load_task_num_per_be:FE配置项,默认为5,可以运行时修改。该参数限制了每个BE节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源

max_routine_load_job_num:FE配置项,默认为100,可以运行时修改。该参数限制的routine load导入作业的总数,包括need_scheduled、running、pause这些状态。超过后,不能再提交新的作业

max_consumer_num_per_group:BE配置项,默认为3。该参数表示一个子任务中最多生成几个consumer进行数据消费

push_write_mbytes_per_sec:BE配置项。默认为10,即10MB/s。该参数为导入通用参数,不限于routine load导入作业。该参数限制了导入数据写入磁盘的速度。对于SSD等高性能存储设备,可以适当增加这个限速

max_tolerable_backend_down_num: FE配置项,默认值是0。在满足某些条件下,Doris可将paused的任务重新调度变成running。该参数为0代表只有所有BE节点是alive状态才允许重新调度

period_of_auto_resume_min: FE配置项,默认是5分钟。Doris重新调度时,只会在5分钟这个周期内,最多尝试3次. 如果3次都失败则锁定当前任务,后续不再进行调度。但可通过人为干预,进行手动恢复

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐