ClickHouse实战--对接kafka
Clickhouse支持了把kafka的数据同步到数据表中,可以通过配置的方式来做到这一点。而且能保证excactly once的语意,这种机制,为实时流的处理带来了很大的方便。
·
概述
Clickhouse提供了直接对接Kafka的能力,这样我们可以方便的保存和处理kafka数据。将Kafka中数据导入ClickHouse的标准流程是:
- 在ClickHouse中建立Kafka Engine的表,作为Kafka数据源的一个接口。这一步实际上创建了一个kafka的消费者。
- 在ClickHouse中创建普通表(通常是MergeTree系列)用来保存Kafka中的数据。
- 在ClickHouse中创建Materialized View, 监听Kafka中的数据,并将数据写入ClickHouse存储表中。
Kafka表引擎介绍
ClickHouse的Kafka表引擎,有以下特性:
- 发布或者订阅数据流。
- 容错存储机制。
- 处理流数据。
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
更加详细的介绍,请参考:这里。
实战步骤
创建一个kafka引擎的表
创建一个kafka引擎的表,用来对接kafka的数据。如下建表语句:
CREATE TABLE dmp.events_queue
(
event_id String COMMENT 'event id' ,
appkey String COMMENT 'app key ',
event_name String COMMENT 'event name'
)
ENGINE = Kafka SETTINGS kafka_broker_list = '192.168.1.3:9092,192.168.1.4:9092,192.168.1.5:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2;
参数说明
kafka_broker_list
: 这里填写Kafka服务的broker列表,用逗号分隔kafka_topic_list
: 这里填写Kafka topic,多个topic用逗号分隔kafka_group_name
:这里填写消费者group名称kafka_format
:Kafka数据格式, ClickHouse支持的Format,,Clickhouse支持多种格式,详见: 详见这里
可选参数:
kafka_skip_broken_messages
:填写大于等于0的整数,表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监听数据kafka_num_consumers
: 单个Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数*。kafka_row_delimiter
: 消息分隔符*kafka_schema*
__:对于kafka_format需要schema定义的时候,其schema由该参数确定kafka_max_block_size
: 该参数控制Kafka数据写入目标表的Block大小,超过该数值后,就将数据刷盘。
创建一张本地表
该本地表用来把kafka对接到的数据保存起来,可供查询和使用。
CREATE TABLE dmp.events
(
event_id String COMMENT 'event id' ,
appkey String COMMENT 'app key ',
event_name String COMMENT 'event name'
)
ENGINE = MergeTree()
PARTITION BY appkey
ORDER BY (event_id)
SETTINGS index_granularity = 8192;
创建一个物化视图
该物化视图会监听Kafka引擎的表数据,并把新添加到kafka表引擎中的数据同步到本地表中。
create materialized view events_view to dmp.events as select * from dmp.events_queue;
小结
Clickhouse支持了把kafka的数据同步到数据表中,可以通过配置的方式来做到这一点。而且能保证excactly once的语意,这种机制,为实时流的处理带来了很大的方便。
比如,我们可以实时流处理任务拆分开,一个任务进行数据处理,并把结果保存到下一个kafka,然后用clickhouse来把数据保存到数据表中进行查询。
参考资料
更多推荐
已为社区贡献2条内容
所有评论(0)