Kafka(二)实战篇(集群搭建、客户端命令、日志查看、Kafka原生API、Spring Boot Kafka)
1. Kafka 集群搭建在生产环境中为了防止单点问题,Kafka 都是以集群方式出现的。下面要搭建一个 Kafka集群,包含三个 Kafka 主机,即三个 Broker。1.1 Kafka 的下载http://kafka.apache.org/downloads1.2 安装并配置第一台主机(1) 上传并解压将下载好的 Kafka 压缩包上传至 CentOS 虚拟机,并解压。[外链图片转存失败,源
1. Kafka 集群搭建
在生产环境中为了防止单点问题,Kafka 都是以集群方式出现的。下面要搭建一个 Kafka集群,包含三个 Kafka 主机,即三个 Broker。
1.1 Kafka 的下载
http://kafka.apache.org/downloads
1.2 安装并配置第一台主机
(1) 上传并解压
将下载好的 Kafka 压缩包上传至 CentOS 虚拟机,并解压。
tar -zxvf kafka_2.11-2.2.0tgz -C /opt/apps
(2) 创建软链接
为了屏蔽版本号,创建软连接,类似于快捷方式:
ln -s kafka_2.11-2.2.0/ kafka
(3) 修改配置文件
在 kafka 安装目录下有一个 config/server.properties 文件,修改该文件。
broker.id:集群中每个kafka主机唯一标识
listeners:broker之间通信使用的,不设置也行,默认当前主机地址:9092
advertised.listeners:消费者、生产者和broker之间通信的地址,如果不设置,用的是上面的listeners配置的地址
log.dirs:kafka消息日志存放的位置
num.partitions:执行命令的时候不指定创建几个partitions,则默认取这个值
zookeeper.connect:设置zk的地址,如果用的是集群则配置的时候用逗号分割
1.3 再克隆两台 Kafka
以 kafkaOS1 为母机再克隆两台 Kafka 主机。在克隆完毕后,需要修改 server.properties中的 broker.id、listeners 与 advertised.listeners。
1.4 kafka 的启动与停止
(1) 启动 zookeeper
zkServer.sh start
(2) 启动 kafka
在命令后添加-daemon 参数,可以使 kafka 以守护进程方式启动,即不占用窗口。
bin/kafka-server-start.sh -daemon config/server.properties
(3) 停止 kafka
bin/kafka-server-stop.sh
1.5 kafka 操作
(1) 创建 topic
bin/kafka-topics.sh --create --bootstrap-server 192.168.59.151:9092 --replication-factor 3 --partitions 3 --topic test
–bootstrap-server:可以指定集群中任意一个kafka主机的地址,也可以指定多个主机地址,逗号分隔
–replication-factor:复制因子(备份几份)
–partitions:分区数量
(2) 查看 topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.59.151:9092
(3) 删除 topic
bin/kafka-topics.sh --delete --bootstrap-server 192.168.59.151:9092 --topic test
(4) 发送消息
bin/kafka-console-producer.sh --broker-list 192.168.59.151:9092 --topic test
该命令会创建一个生产者,然后由其生产消息。
–broker-list:指定broker的地址,可以写多个,逗号分隔
(5) 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.59.151:9092 --topic test --from-beginning
–from-beginning:指定从一开始消费消息,可以消费消费者启动前的消息(不指定 默认 消费者只能消费 当前消费者启动之后 生产者生产的消息)
2. 日志查看
我们这里说的日志不是Kafka的启动日志(运行日志),启动日志在Kafka安装目录下的logs/server.log中。消息在磁盘上都是以日志的形式保存的。我们这里说的日志是存放在/tmp/kafka_logs目录中的消息日志,即 partition 与 segment。
2.1 查看分区与备份
(1) 1 个分区 1 个备份
我们前面创建的 test 主题是 1 个分区 1 个备份,只有一个broker中有一个 test 主题的分区
。
(2) 3 个分区 1 个备份
再次创建一个主题,命名为 one,创建三个分区,但仍为一个备份。 依次查看三台broker,可以看到每台 broker 中都有一个 one 主题的分区
。
(3) 3 个分区 3 个备份
再次创建一个主题,命名为 two,创建三个分区,三个备份。依次查看三台 broker,可以看到每台 broker 中都有三份 two 主题的分区
。
2.2 查看分区与备份在 zk 中的信息
使用 zkCli.sh 命令连接上 zk,可以查看到 kafka 在 zk 的信息。
以三个分区,两个备份的主题cities为例
(1) /brokers 目录
(2) /brokers/ids 目录
存放的是 kafka 集群中各个主机的 broker-id 列表。
每个 id 的数据内容为当前主机的信息。
(3) /brokers/topics
可以看到topics下存放的是各个主题。
/brokers/topics/cities/partitions 中存放的是 cities主题下所包含的 partition。这里的 0、1、2,在/tmp/kafka-logs 目录中即为 cities-0,cities-1,cities-2。
每个分区下还可以查看状态信息。
leader:代表当前分区副本Leader是在哪个主机上
isr:同步列表,代表当前分区同步中的副本所在主机位置
直接查看主题节点内容。
partitions,可以看出:
主机2上存放的cities主题的分区是0和1
主机1上存放的cities主题的分区是1和2
主机0上存放的cities主题的分区是2和0
2.3 查看段 segment
进入test主题的分区:
(1) segment 文件
segment 是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。
文件名20位数字组成,代表当前文件记录消息之前,已经有多少条消息
170210.log文件表示记录的消息是从第170211条开始的
假如我们要查找第170213条消息,先根据二分法从所有segment文件中
根据文件名找到该消息应该在哪个文件名的文件中,最终找到170210的文件名
在将170213减去170210,等于3,对应170210.index文件中的2
通过2找到对应的偏移量0348
再根据偏移量0348在170210.log找到对应的消息内容
(2) 查看 segment
对于 segment 中的 log 文件,不能直接通过 cat 命令查找其内容,而是需要通过 kafka自带的一个工具查看。
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
解释:
kafka-run-class.sh命令指定运行工具类 kafka.tools.DumpLogSegments(Java类)
(3) 特殊主题 __consumer_offsets的分区
__consumer_offsets主题默认有0-49,默认50个分区,1个副本,可以配置。
Consumer 从 partition 中取出一批消息写入到 buffer 对其进行消费,消费完消息后,会将其消费消息的 offset 提交给 broker,以让 broker 记录下哪些消息是消费过的,消费者提交的 offset 被封装为了一种特殊的消息被写入到了一个由系统创建的、名称为__consumer_offset 的特殊主题的 partitions 中。
3. Kafka API
首先在命令行创建一个名称为 cities 的主题,并创建该主题的订阅者。
3.1 使用 kafka 原生 API
3.1.1 创建工程
创建一个 Maven 的 Java 工程,命名为 kafkaDemo。创建时无需导入依赖。为了简单,后面的发布者与消费者均创建在该工程中。
3.1.2 导入依赖
<!-- kafka 依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.1</version>
</dependency>
3.1.3 创建发布者 OneProducer
(1) 创建发布者类 OneProducer
public class OneProducer {
// 第一个泛型:当前生产者所生产消息的key
// 第二个泛型:当前生产者所生产的消息本身
private KafkaProducer<Integer, String> producer;
public OneProducer() {
Properties properties = new Properties();
// 指定kafka集群
properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
// 指定key与value的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<Integer, String>(properties);
}
public void sendMsg() {
// 创建消息记录(包含主题、消息本身) (String topic, V value)
// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
// 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value)
// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
// 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value)
ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 0, 1, "tianjin");
producer.send(record);
}
}
(2) 创建测试类 OneProducerTest
public class OneProducerTest {
public static void main(String[] args) throws IOException {
OneProducer producer = new OneProducer();
producer.sendMsg();
System.in.read();
}
}
(3) 效果
多次启动,用命令方式消费:
3.1.4 创建发布者 TwoProducer
前面的方式在消息发送成功后,代码中没有任何提示,这里可以使用回调方式,即发送成功后,会触发回调方法的执行。
(1) 创建发布者类 TwoProducer
复制 OneProducer 类,仅修改 sendMsg()方法。
public class TwoProducer {
// 第一个泛型:当前生产者所生产消息的key
// 第二个泛型:当前生产者所生产的消息本身
private KafkaProducer<Integer, String> producer;
public TwoProducer() {
Properties properties = new Properties();
// 指定kafka集群
properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
// 指定key与value的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<Integer, String>(properties);
}
public void sendMsg() {
// 创建消息记录(包含主题、消息本身) (String topic, V value)
// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
// 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value)
// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
// 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value)
ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 2, 1, "tianjin");
producer.send(record, (metadata, ex) -> {
System.out.println("topic = " + metadata.topic());
System.out.println("partition = " + metadata.partition());
System.out.println("offset = " + metadata.offset());
});
}
}
(2) 创建测试类 TwoProducerTest
public class TwoProducerTest {
public static void main(String[] args) throws IOException {
TwoProducer producer = new TwoProducer();
producer.sendMsg();
System.in.read();
}
}
(3) 效果
3.1.5 批量发送消息
(1) 创建发布者类 SomeProducerBatch
复制前面的发布者类,在其基础上进行修改。
public class SomeProducerBatch {
// 第一个泛型:当前生产者所生产消息的key
// 第二个泛型:当前生产者所生产的消息本身
private KafkaProducer<Integer, String> producer;
public SomeProducerBatch() {
Properties properties = new Properties();
// 指定kafka集群
properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
// 指定key与value的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定生产者每10条向broker发送一次
properties.put("batch.size", 10);
// 指定生产者每50ms向broker发送一次
properties.put("linger.ms", 50);
this.producer = new KafkaProducer<Integer, String>(properties);
}
public void sendMsg() {
for(int i=0; i<50; i++) {
ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "city-" + i);
int k = i;
producer.send(record, (metadata, ex) -> {
System.out.println("i = " + k);
System.out.println("topic = " + metadata.topic());
System.out.println("partition = " + metadata.partition());
System.out.println("offset = " + metadata.offset());
});
}
}
}
(2) 创建测试类 ProducerBatchTest
public class ProducerBatchTest {
public static void main(String[] args) throws IOException {
SomeProducerBatch producer = new SomeProducerBatch();
producer.sendMsg();
System.in.read();
}
}
(3) 效果
producer.send方法触发了50次、回调触发了50次,但是实际上发到Kafka只有5次,控制台看不出来
3.1.6 消费者组
(1) 创建消费者类 SomeConsumer
public class SomeConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public SomeConsumer() {
// 调用父类构造器,两个参数:
// 1)指定当前消费者名称
// 2)指定消费过程是否会被中断(比如消费一批数据,还没消费完,消费者取消了,那么批数据消费是否会中断)
super("KafkaConsumerTest", false);
Properties properties = new Properties();
String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
// map中的key其实都是常量,定义在ConsumerConfig中
// 指定kafka集群
properties.put("bootstrap.servers", brokers);
// 指定消费者组ID
properties.put("group.id", "cityGroup1");
// 开启自动提交,默认为true
properties.put("enable.auto.commit", "true");
// 指定自动提交时间间隔,默认5s
properties.put("auto.commit.interval.ms", "1000");
// 指定poll一次获取消息的最大数量,默认值是500
properties.put("max.poll.records", 500);
// 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
// 认为消费者已经挂掉。默认为10s
properties.put("session.timeout.ms", "30000");
// 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
properties.put("heartbeat.interval.ms", "10000");
// 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
// earliest:指定offset为第一条offset
// latest: 指定offset为最后一条offset
properties.put("auto.offset.reset", "earliest");
// 指定key与value的反序列化器
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
// 订阅消费主题
consumer.subscribe(Collections.singletonList("cities"));
// 从broker拉取消息。
// 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
// 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
// > 0,表示当时间到后仍没有消息,再返回空
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for(ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
}
}
}
订阅消费主题,可以用正则、集合订阅多个主题:
可以指定poll一次获取消息的最大数量,默认500:
(2) 创建测试类 ConsumerTest
public class ConsumerTest {
public static void main(String[] args) {
SomeConsumer consumer = new SomeConsumer();
consumer.start();
}
}
(3) 效果
3.1.7 消费者同步手动提交
(1) 手动提交分类
手动提交又可以划分为同步提交
、异步提交
提交。这些提交方式仅仅是doWork()方法不相同,其构造器是相同的。所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现两种不同的提交方式。
(2) 创建消费者类 SyncManualConsumer
-
A、原理
同步提交方式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。 -
B、 修改构造器
直接复制前面的 SomeConsumer,在其基础上进行修改。
-
C、 修改 doWork()方法
public class SyncManualConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public SyncManualConsumer() {
// 两个参数:
// 1)指定当前消费者名称
// 2)指定消费过程是否会被中断
super("KafkaConsumerTest", false);
Properties properties = new Properties();
String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
// 指定kafka集群
properties.put("bootstrap.servers", brokers);
// 指定消费者组ID
properties.put("group.id", "cityGroup1");
// 开启手动提交
properties.put("enable.auto.commit", "false");
// 指定自动提交时间间隔,默认5s
// properties.put("auto.commit.interval.ms", "1000");
// 指定一次提交10个offset
properties.put("max.poll.records", 10);
// 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
// 认为消费者已经挂掉。默认为10s
properties.put("session.timeout.ms", "30000");
// 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
properties.put("heartbeat.interval.ms", "10000");
// 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
// earliest:指定offset为第一条offset
// latest: 指定offset为最后一条offset
properties.put("auto.offset.reset", "earliest");
// 指定key与value的反序列化器
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
// 订阅消费主题
consumer.subscribe(Collections.singletonList("cities"));
// 从broker拉取消息。
// 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
// 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
// > 0,表示当时间到后仍没有消息,再返回空
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for(ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
}
// 手动同步提交
consumer.commitSync();
}
}
(4) 创建测试类 SyncManulTest
public class SyncManualTest {
public static void main(String[] args) {
SyncManualConsumer consumer = new SyncManualConsumer();
consumer.start();
}
}
3.1.8 消费者异步手动提交
(1) 原理
手动同步提交方式需要等待 broker 的成功响应,效率太低,影响消费者的吞吐量。异步提交方式是,消费者向 broker 提交 offset 后不用等待成功响应
,所以其增加了消费者的吞吐量。
(2) 创建消费者类 AsyncManualConsumer
复制前面的 SyncManualConsumer 类,在其基础上进行修改。
//com.abc.consumer.test3.AsynManualConsumer#doWork
public void doWork() {
// 订阅消费主题
consumer.subscribe(Collections.singletonList("cities"));
// 从broker拉取消息。
// 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
// 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
// > 0,表示当时间到后仍没有消息,再返回空
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for(ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
}
// 手动异步提交
// consumer.commitAsync();
consumer.commitAsync((offsets, ex) -> {
if(ex != null) {
System.out.print("提交失败,offsets = " + offsets);
System.out.println(", exception = " + ex);
}
});
}
(3) 创建测试类 AsyncManulTest
public class AsyncManualTest {
public static void main(String[] args) {
AsynManualConsumer consumer = new AsynManualConsumer();
consumer.start();
}
}
3.2 Spring Boot Kafka
为了简单,以下代码是将消息发布者与订阅者定义到了一个工程中的。
3.2.1 创建工程
创建一个 Spring Boot 工程,导入如下依赖。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.2.2 定义发布者
Spring 是通过 KafkaTemplate 来完成对 Kafka 的操作的。
(1) 修改配置文件
# 自定义属性
kafka:
topic: cities
# 配置Kafka
spring:
kafka:
bootstrap-servers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092
producer: # 配置生产者
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2) 定义发布者处理器
Spring Kafka 通过 KafkaTemplate 完成消息的发布。
@RestController
public class SomeProducer {
@Autowired
private KafkaTemplate<String, String> template;
// 从配置文件读取自定义属性
@Value("${kafka.topic}")
private String topic;
// 由于是提交数据,所以使用Post方式
@PostMapping("/msg/send")
public String sendMsg(@RequestParam("message") String message) {
template.send(topic, message);
return "send success";
}
}
3.2.3 定义消费者
Spring 是通过监听方式实现消费者的。
(1) 修改配置文件
在配置文件中添加如下内容。注意,Spring 中要求必须为消费者指定组。
# 自定义属性
kafka:
topic: cities
# 配置Kafka
spring:
kafka:
bootstrap-servers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092
consumer: # 配置消费者
group-id: group0 # 消费者组
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
(2) 定义消费者
Spring Kafka 是通过 KafkaListener 监听方式来完成消息订阅与接收的。当监听到有指定主题的消息时,就会触发@KafkaListener 注解所标注的方法的执行。
@Component
public class SomeConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void onMsg(String message) {
System.out.println("Kafka消费者接受到消息 " + message);
}
}
演示
更多推荐
所有评论(0)