前言

项目采用Flink、Kafka、Clickhouse的实时数仓架构,数据由Kafka接入,经过Flink处理,写入Clickhouse。

初期直接由Flink写入Clickhouse,经常会出现分区太多合并不来等意外情况,效果不是很好。而且数据也需要共享到其他地方,故直接写入CK对我们来说不是很好的选择。

于是先将数据写入Kafka,之后由Kafka对数据进行分发。

从Kafka写入CK有很多种方法:

  • 使用Flink Connector
  • 使用github上开源的kafka ck组件
  • 使用CK的Kafka表引擎

为了便于维护、降低成本,决定探索下使用CK的Kafka表引擎来写入数据。

Kafka表引擎

简介

CK的Kafka表引擎就是集成了Kafka,在CK端可以查询、写入Kafka。

原理就是CK作为Kafka的生产者或者消费者来生产消费数据,实现数据的同步。

建表

建表语法

和其他表引擎类似,只不过ENGINE为Kafka()

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = 
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
参数说明
必选参数
  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。
可选参数:
  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。

  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。

  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

  • kafka_max_block_size轮询的最大批处理大小(以消息为单位)(默认值:max_block_size)。

  • kafka_skip_broken_messagesKafka消息解析器对每个块的架构不兼容消息的容忍度。默认值:0。如果kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(消息等于一行数据)。

  • kafka_commit_every_batch写入整个块后,提交每个消耗和处理的批处理,而不是单个提交(默认值:0)。

  • kafka_thread_per_consumer为每个消费者者提供独立的线程(默认值:0)。启用后,每个消费者将并行并行地刷新数据(否则,来自多个消费者的行将被压缩以形成一个块)。

其他参数
  • format_csv_delimiter当格式类型为CSV相关的时候,可以通过这个参数来指定CSV的分隔符
建表示例
CREATE TABLE IF NOT EXISTS test_ck_sync1 ( \
	sys_time Datetime comment '',\
	num UInt32 comment ''\
)\
ENGINE = Kafka()\
SETTINGS \
	kafka_broker_list = '10.8.14.227:9092', \
	kafka_topic_list='test_ck_sync1', \
	kafka_group_name='ck_test_ck_sync1', \
	kafka_format = 'CSV',\
	kafka_max_block_size = 200000,\
	kafka_skip_broken_messages = 1000,\
	kafka_row_delimiter = '\n',\
	format_csv_delimiter = '|';

虚拟列

Kafka表引擎还提供了虚拟类,来获取一些Kafka的元数据信息。如下:

  • _topic — Kafka topic
  • _key — 消息的key
  • _offset — 偏移量
  • _timestamp — 消息的时间戳
  • _partition — 分区信息

从CK写入Kafka

从CK写入Kafka时,CK作为Kafka的生产者。

直接向Kafka表引擎的表中插入数据即可,会自动写入到Kafka中。

这个应用较少,就不介绍了

从Kafka写入CK

从Kafka写入CK时,CK作为Kafka的消费者,消费者组是我们创建表时指定的group。

有以下几点需注意的:

  • 同一个消费者组每条消息只能消费一次。Kafka中每个消费者组对每条消息只能消费一次,所以Kafka表引擎的表只能对一条数据查询一次,再次执行查询就会发现没有了。
  • 消费者组的分区会自动分配。如果我们使用同一个消费者组创建了多个表,多个表会平分主题的各个分区。表个数不应该大于分区数,不然就会有空闲的。
  • Kafka当增加分区时,CK会自动重分配分区。

由上我们可知,只能对一条消息消费一次。我们可以采用物化视图+结果表的方式来将Kafka数据持久化到CK。

Kafka数据持久化到CK

当加入物化视图后,在后台收集数据。这样就可以连续接收来自Kafka的消息,用SELECT将其转换为所需的格式,写入结果表。

实现步骤如下:

  • 创建一个Kafka表引擎的表
  • 创建一个结果表,用于存储最终数据
  • 创建一个物化视图,将Kafka的数据写入结果表
实现
# 创建Kafka表引擎的表
CREATE TABLE IF NOT EXISTS test_ck_sync1 ( \
	sys_time Datetime comment '',\
	num UInt32 comment ''\
)\
ENGINE = Kafka()\
SETTINGS \
	kafka_broker_list = '10.8.14.227:9092', \
	kafka_topic_list='test_ck_sync1', \
	kafka_group_name='ck_test_ck_sync1', \
	kafka_format = 'CSV',\
	kafka_max_block_size = 200000,\
	kafka_skip_broken_messages = 1000,\
	kafka_row_delimiter = '\n',\
	format_csv_delimiter = '|';


# 创建结果表
CREATE TABLE IF NOT EXISTS test_ck_sync1_res( \
	sys_time Datetime comment '',\
	num UInt32 comment ''\
)\
ENGINE = MergeTree()\
ORDER BY tuple()\
PARTITION BY toYYYYMMDD(sys_time);


# 创建物化视图写入
create materialized view test_ck_sync1_mv to test_ck_sync1_res as select sys_time,num from test_ck_sync1;

创建完成后,往Kafka的topic中写入数据,在CK中查询就可以看到数据被写入了。

如果没有数据写入,可以查看拼写是否错误、网络是否通、查看日志等。

参数配置&优化

默认的参数是相对性能较好的一个配置,为了更好的性能, 我们还需要对参数进行调整。

以下是笔者在使用过程中查文档、看文章总结出来的,如有误,请指正。

有以下三个地方可以配置参数:

  • 在建表语句上配置
  • config.xml中 或者 在config.d目录下新建xml进行配置。效果一样
  • 在用户级别的配置users.xml中配置
在建表语句上配置

建表语句上配置的参数,就是上文说到的那些参数说明。挑几个对性能会有影响的简单说下:

  • kafka_num_consumers – 单个表的消费者数量。kafka是基于分区来消费数据的,各个消费者平分所有分区。适当的调整消费者数量,可以提高性能。
  • kafka_max_block_size轮询的最大批处理大小(以消息为单位)(默认值:max_block_size)。这个就是每次poll的数据条数,适当调整可以增加吞吐量
  • kafka_skip_broken_messages = N,则引擎会跳过N条无法解析的Kafka消息(消息等于一行数据)。
  • kafka_commit_every_batch写入整个块后,提交每个消耗和处理的批处理,而不是单个提交(默认值:0)。
  • kafka_thread_per_consumer为每个消费者者提供独立的线程(默认值:0)。这个个人理解是单表写入ck的线程数,是每个消费者写入一个文件 或者 所有消费者合并后写入一个文件。
在config.xml中配置

这块的配置直接加载config.xml中,或者在config.d目录下xxx.xml,然后将配置写入xml中。

CK是C语言实现的,Kafka表引擎基于librdkafka库实现,这块的配置可以参考librdkafka。配置属性在librdkafka中是以.分隔的,ck中应该用_分隔。

这个配置可以应用于所有kafka表引擎也可以应用于特定的主题。全局应用时,直接将属性写到kafka标签下;应用到特定主题时,可以写到kafka_主题名标签下。

以下是一个配置示例:

在config.d目录下新建kafka.xml,然后写入如下内容:

<yandex>
        <kafka>
            	<!-- 每次拉取,每个分区拉取的最大字节数 -->
                <max_partition_fetch_bytes>16000000</max_partition_fetch_bytes>
            	<!-- 每次拉取,所有分区拉取的最大字节数 -->
                <fetch_max_bytes>52428800</fetch_max_bytes>
            	<!-- 每次拉取的最小字节数 -->
                <fetch_min_bytes>16000000</fetch_min_bytes>
            	<!-- 每次拉取的最大等待时间 -->
                <fetch_wait_max_ms>30000</fetch_wait_max_ms>
            	<!-- 写入时的数据条数批大小 -->
                <batch_num_messages>50000</batch_num_messages>
            	<!-- 写入时的批size字节-->
                <batch_size>32000000</batch_size>
        </kafka>
</yandex>

注意:这里配置是需要重启CK的,不然不生效。

这里的参数大家可以根据需要,从文档中查找参数,然后配置合适的值,来提高性能。

在users.xml中配置
max_insert_block_size

要插入到表中的块的大小。此设置仅适用于服务器形成块的情况。

个人理解是写入分区文件时的块大小。

stream_flush_interval_ms

适用于在超时的情况下或线程生成流式传输的表 max_insert_block_size 行。

默认值为7500。

值越小,数据被刷新到表中的频率就越高。

个人理解这两个配置就是写入文件时的大小和时间的两个阈值。

注意:这两个配置不仅影响Kafka,配置时要慎重

总结

目前在测试、使用中发现使用CK的Kafka表引擎导入CK,整体性能还可以。

参数配置合适的话,也不会产生很多小文件。

维护方面也较为轻松。

可以一试。

参考

Clickhouse Kafka表引擎 https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/

ClickHouse Kafka Engine Tutorial https://altinity.com/blog/2020/5/21/clickhouse-kafka-engine-tutorial

ClickHouse Kafka Engine FAQ https://altinity.com/blog/clickhouse-kafka-engine-faq

librdkafka/CONFIGURATION.md https://github.com/edenhill/librdkafka/blame/master/CONFIGURATION.md

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐