ClickHouse Kafka表引擎使用详解
前言项目采用Flink、Kafka、Clickhouse的实时数仓架构,数据由Kafka接入,经过Flink处理,写入Clickhouse。初期直接由Flink写入Clickhouse,经常会出现分区太多合并不来等意外情况,效果不是很好。而且数据也需要共享到其他地方,故直接写入CK对我们来说不是很好的选择。于是先将数据写入Kafka,之后由Kafka对数据进行分发。从Kafka写入CK有很多种方法
前言
项目采用Flink、Kafka、Clickhouse的实时数仓架构,数据由Kafka接入,经过Flink处理,写入Clickhouse。
初期直接由Flink写入Clickhouse,经常会出现分区太多合并不来等意外情况,效果不是很好。而且数据也需要共享到其他地方,故直接写入CK对我们来说不是很好的选择。
于是先将数据写入Kafka,之后由Kafka对数据进行分发。
从Kafka写入CK有很多种方法:
- 使用Flink Connector
- 使用github上开源的kafka ck组件
- 使用CK的Kafka表引擎
为了便于维护、降低成本,决定探索下使用CK的Kafka表引擎来写入数据。
Kafka表引擎
简介
CK的Kafka表引擎就是集成了Kafka,在CK端可以查询、写入Kafka。
原理就是CK作为Kafka的生产者或者消费者来生产消费数据,实现数据的同步。
建表
建表语法
和其他表引擎类似,只不过ENGINE为Kafka()
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE =
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
参数说明
必选参数
kafka_broker_list
– 以逗号分隔的 brokers 列表 (localhost:9092
)。kafka_topic_list
– topic 列表 (my_topic
)。kafka_group_name
– Kafka 消费组名称 (group1
)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。kafka_format
– 消息体格式。使用与 SQL 部分的FORMAT
函数相同表示方法,例如JSONEachRow
。了解详细信息,请参考Formats
部分。
可选参数:
-
kafka_row_delimiter
- 每个消息体(记录)之间的分隔符。 -
kafka_schema
– 如果解析格式需要一个 schema 时,此参数必填。 -
kafka_num_consumers
– 单个表的消费者数量。默认值是:1
,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。 -
kafka_max_block_size
轮询的最大批处理大小(以消息为单位)(默认值:max_block_size)。 -
kafka_skip_broken_messages
Kafka消息解析器对每个块的架构不兼容消息的容忍度。默认值:0。如果kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(消息等于一行数据)。 -
kafka_commit_every_batch
写入整个块后,提交每个消耗和处理的批处理,而不是单个提交(默认值:0)。 -
kafka_thread_per_consumer
为每个消费者者提供独立的线程(默认值:0)。启用后,每个消费者将并行并行地刷新数据(否则,来自多个消费者的行将被压缩以形成一个块)。
其他参数
format_csv_delimiter
当格式类型为CSV相关的时候,可以通过这个参数来指定CSV的分隔符
建表示例
CREATE TABLE IF NOT EXISTS test_ck_sync1 ( \
sys_time Datetime comment '',\
num UInt32 comment ''\
)\
ENGINE = Kafka()\
SETTINGS \
kafka_broker_list = '10.8.14.227:9092', \
kafka_topic_list='test_ck_sync1', \
kafka_group_name='ck_test_ck_sync1', \
kafka_format = 'CSV',\
kafka_max_block_size = 200000,\
kafka_skip_broken_messages = 1000,\
kafka_row_delimiter = '\n',\
format_csv_delimiter = '|';
虚拟列
Kafka表引擎还提供了虚拟类,来获取一些Kafka的元数据信息。如下:
_topic
— Kafka topic_key
— 消息的key_offset
— 偏移量_timestamp
— 消息的时间戳_partition
— 分区信息
从CK写入Kafka
从CK写入Kafka时,CK作为Kafka的生产者。
直接向Kafka表引擎的表中插入数据即可,会自动写入到Kafka中。
这个应用较少,就不介绍了
从Kafka写入CK
从Kafka写入CK时,CK作为Kafka的消费者,消费者组是我们创建表时指定的group。
有以下几点需注意的:
- 同一个消费者组每条消息只能消费一次。Kafka中每个消费者组对每条消息只能消费一次,所以Kafka表引擎的表只能对一条数据查询一次,再次执行查询就会发现没有了。
- 消费者组的分区会自动分配。如果我们使用同一个消费者组创建了多个表,多个表会平分主题的各个分区。表个数不应该大于分区数,不然就会有空闲的。
- Kafka当增加分区时,CK会自动重分配分区。
由上我们可知,只能对一条消息消费一次。我们可以采用物化视图+结果表的方式来将Kafka数据持久化到CK。
Kafka数据持久化到CK
当加入物化视图后,在后台收集数据。这样就可以连续接收来自Kafka的消息,用SELECT将其转换为所需的格式,写入结果表。
实现步骤如下:
- 创建一个Kafka表引擎的表
- 创建一个结果表,用于存储最终数据
- 创建一个物化视图,将Kafka的数据写入结果表
实现
# 创建Kafka表引擎的表
CREATE TABLE IF NOT EXISTS test_ck_sync1 ( \
sys_time Datetime comment '',\
num UInt32 comment ''\
)\
ENGINE = Kafka()\
SETTINGS \
kafka_broker_list = '10.8.14.227:9092', \
kafka_topic_list='test_ck_sync1', \
kafka_group_name='ck_test_ck_sync1', \
kafka_format = 'CSV',\
kafka_max_block_size = 200000,\
kafka_skip_broken_messages = 1000,\
kafka_row_delimiter = '\n',\
format_csv_delimiter = '|';
# 创建结果表
CREATE TABLE IF NOT EXISTS test_ck_sync1_res( \
sys_time Datetime comment '',\
num UInt32 comment ''\
)\
ENGINE = MergeTree()\
ORDER BY tuple()\
PARTITION BY toYYYYMMDD(sys_time);
# 创建物化视图写入
create materialized view test_ck_sync1_mv to test_ck_sync1_res as select sys_time,num from test_ck_sync1;
创建完成后,往Kafka的topic中写入数据,在CK中查询就可以看到数据被写入了。
如果没有数据写入,可以查看拼写是否错误、网络是否通、查看日志等。
参数配置&优化
默认的参数是相对性能较好的一个配置,为了更好的性能, 我们还需要对参数进行调整。
以下是笔者在使用过程中查文档、看文章总结出来的,如有误,请指正。
有以下三个地方可以配置参数:
- 在建表语句上配置
- 在
config.xml
中 或者 在config.d
目录下新建xml进行配置。效果一样 - 在用户级别的配置
users.xml
中配置
在建表语句上配置
建表语句上配置的参数,就是上文说到的那些参数说明。挑几个对性能会有影响的简单说下:
kafka_num_consumers
– 单个表的消费者数量。kafka是基于分区来消费数据的,各个消费者平分所有分区。适当的调整消费者数量,可以提高性能。kafka_max_block_size
轮询的最大批处理大小(以消息为单位)(默认值:max_block_size)。这个就是每次poll的数据条数,适当调整可以增加吞吐量- kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(消息等于一行数据)。
kafka_commit_every_batch
写入整个块后,提交每个消耗和处理的批处理,而不是单个提交(默认值:0)。kafka_thread_per_consumer
为每个消费者者提供独立的线程(默认值:0)。这个个人理解是单表写入ck的线程数,是每个消费者写入一个文件 或者 所有消费者合并后写入一个文件。
在config.xml中配置
这块的配置直接加载config.xml中,或者在config.d
目录下xxx.xml,然后将配置写入xml中。
CK是C语言实现的,Kafka表引擎基于librdkafka
库实现,这块的配置可以参考librdkafka。配置属性在librdkafka中是以.
分隔的,ck中应该用_
分隔。
这个配置可以应用于所有kafka表引擎也可以应用于特定的主题。全局应用时,直接将属性写到kafka
标签下;应用到特定主题时,可以写到kafka_主题名
标签下。
以下是一个配置示例:
在config.d目录下新建kafka.xml
,然后写入如下内容:
<yandex>
<kafka>
<!-- 每次拉取,每个分区拉取的最大字节数 -->
<max_partition_fetch_bytes>16000000</max_partition_fetch_bytes>
<!-- 每次拉取,所有分区拉取的最大字节数 -->
<fetch_max_bytes>52428800</fetch_max_bytes>
<!-- 每次拉取的最小字节数 -->
<fetch_min_bytes>16000000</fetch_min_bytes>
<!-- 每次拉取的最大等待时间 -->
<fetch_wait_max_ms>30000</fetch_wait_max_ms>
<!-- 写入时的数据条数批大小 -->
<batch_num_messages>50000</batch_num_messages>
<!-- 写入时的批size字节-->
<batch_size>32000000</batch_size>
</kafka>
</yandex>
注意:这里配置是需要重启CK的,不然不生效。
这里的参数大家可以根据需要,从文档中查找参数,然后配置合适的值,来提高性能。
在users.xml中配置
max_insert_block_size
要插入到表中的块的大小。此设置仅适用于服务器形成块的情况。
个人理解是写入分区文件时的块大小。
stream_flush_interval_ms
适用于在超时的情况下或线程生成流式传输的表 max_insert_block_size 行。
默认值为7500。
值越小,数据被刷新到表中的频率就越高。
个人理解这两个配置就是写入文件时的大小和时间的两个阈值。
注意:这两个配置不仅影响Kafka,配置时要慎重
总结
目前在测试、使用中发现使用CK的Kafka表引擎导入CK,整体性能还可以。
参数配置合适的话,也不会产生很多小文件。
维护方面也较为轻松。
可以一试。
参考
Clickhouse Kafka表引擎 https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/
ClickHouse Kafka Engine Tutorial https://altinity.com/blog/2020/5/21/clickhouse-kafka-engine-tutorial
ClickHouse Kafka Engine FAQ https://altinity.com/blog/clickhouse-kafka-engine-faq
librdkafka/CONFIGURATION.md https://github.com/edenhill/librdkafka/blame/master/CONFIGURATION.md
更多推荐
所有评论(0)