概述

Clickhouse提供了直接对接Kafka的能力,这样我们可以方便的保存和处理kafka数据。将Kafka中数据导入ClickHouse的标准流程是:

  • 在ClickHouse中建立Kafka Engine的表,作为Kafka数据源的一个接口。这一步实际上创建了一个kafka的消费者。
  • 在ClickHouse中创建普通表(通常是MergeTree系列)用来保存Kafka中的数据。
  • 在ClickHouse中创建Materialized View, 监听Kafka中的数据,并将数据写入ClickHouse存储表中。

Kafka表引擎介绍

ClickHouse的Kafka表引擎,有以下特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

更加详细的介绍,请参考:这里

实战步骤

创建一个kafka引擎的表

创建一个kafka引擎的表,用来对接kafka的数据。如下建表语句:

CREATE TABLE dmp.events_queue
(
  event_id String COMMENT 'event id' ,
  appkey String COMMENT 'app key ',
  event_name String COMMENT 'event name'
)
ENGINE = Kafka SETTINGS kafka_broker_list = 		'192.168.1.3:9092,192.168.1.4:9092,192.168.1.5:9092',
                            kafka_topic_list = 'topic1,topic2',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 2;

参数说明

  • kafka_broker_list: 这里填写Kafka服务的broker列表,用逗号分隔
  • kafka_topic_list: 这里填写Kafka topic,多个topic用逗号分隔
  • kafka_group_name:这里填写消费者group名称
  • kafka_format:Kafka数据格式, ClickHouse支持的Format,,Clickhouse支持多种格式,详见: 详见这里

可选参数:

  • kafka_skip_broken_messages:填写大于等于0的整数,表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监听数据
  • kafka_num_consumers: 单个Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数*。
  • kafka_row_delimiter: 消息分隔符
  • *kafka_schema*__:对于kafka_format需要schema定义的时候,其schema由该参数确定
  • kafka_max_block_size: 该参数控制Kafka数据写入目标表的Block大小,超过该数值后,就将数据刷盘。
创建一张本地表

该本地表用来把kafka对接到的数据保存起来,可供查询和使用。

CREATE TABLE dmp.events
(
  event_id String COMMENT 'event id' ,
  appkey String COMMENT 'app key ',
  event_name String COMMENT 'event name'
)
ENGINE = MergeTree()  
PARTITION BY appkey
ORDER BY (event_id) 
SETTINGS index_granularity = 8192;
创建一个物化视图

该物化视图会监听Kafka引擎的表数据,并把新添加到kafka表引擎中的数据同步到本地表中。

create materialized view events_view to dmp.events as select * from dmp.events_queue;

小结

Clickhouse支持了把kafka的数据同步到数据表中,可以通过配置的方式来做到这一点。而且能保证excactly once的语意,这种机制,为实时流的处理带来了很大的方便。

比如,我们可以实时流处理任务拆分开,一个任务进行数据处理,并把结果保存到下一个kafka,然后用clickhouse来把数据保存到数据表中进行查询。

参考资料
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐