1. Kafka 集群搭建

在生产环境中为了防止单点问题,Kafka 都是以集群方式出现的。下面要搭建一个 Kafka集群,包含三个 Kafka 主机,即三个 Broker。

1.1 Kafka 的下载

http://kafka.apache.org/downloads

1.2 安装并配置第一台主机

(1) 上传并解压

将下载好的 Kafka 压缩包上传至 CentOS 虚拟机,并解压。
tar -zxvf kafka_2.11-2.2.0tgz -C /opt/apps
在这里插入图片描述
(2) 创建软链接

为了屏蔽版本号,创建软连接,类似于快捷方式:
ln -s kafka_2.11-2.2.0/ kafka
在这里插入图片描述

(3) 修改配置文件

在 kafka 安装目录下有一个 config/server.properties 文件,修改该文件。
在这里插入图片描述
broker.id:集群中每个kafka主机唯一标识
listeners:broker之间通信使用的,不设置也行,默认当前主机地址:9092
advertised.listeners:消费者、生产者和broker之间通信的地址,如果不设置,用的是上面的listeners配置的地址
在这里插入图片描述
log.dirs:kafka消息日志存放的位置
num.partitions:执行命令的时候不指定创建几个partitions,则默认取这个值
在这里插入图片描述
zookeeper.connect:设置zk的地址,如果用的是集群则配置的时候用逗号分割

1.3 再克隆两台 Kafka

以 kafkaOS1 为母机再克隆两台 Kafka 主机。在克隆完毕后,需要修改 server.properties中的 broker.id、listeners 与 advertised.listeners。
在这里插入图片描述
在这里插入图片描述

1.4 kafka 的启动与停止

(1) 启动 zookeeper

zkServer.sh start
在这里插入图片描述

(2) 启动 kafka

在命令后添加-daemon 参数,可以使 kafka 以守护进程方式启动,即不占用窗口。
bin/kafka-server-start.sh -daemon config/server.properties
在这里插入图片描述

(3) 停止 kafka

bin/kafka-server-stop.sh
在这里插入图片描述

1.5 kafka 操作

(1) 创建 topic

bin/kafka-topics.sh --create --bootstrap-server 192.168.59.151:9092 --replication-factor 3 --partitions 3 --topic test

–bootstrap-server:可以指定集群中任意一个kafka主机的地址,也可以指定多个主机地址,逗号分隔
–replication-factor:复制因子(备份几份)
–partitions:分区数量

在这里插入图片描述

(2) 查看 topic

bin/kafka-topics.sh --list --bootstrap-server 192.168.59.151:9092
在这里插入图片描述

(3) 删除 topic

bin/kafka-topics.sh --delete --bootstrap-server 192.168.59.151:9092 --topic test
在这里插入图片描述

(4) 发送消息

bin/kafka-console-producer.sh --broker-list 192.168.59.151:9092 --topic test
该命令会创建一个生产者,然后由其生产消息。

–broker-list:指定broker的地址,可以写多个,逗号分隔

在这里插入图片描述

(5) 消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.59.151:9092 --topic test --from-beginning

–from-beginning:指定从一开始消费消息,可以消费消费者启动前的消息(不指定 默认 消费者只能消费 当前消费者启动之后 生产者生产的消息)

在这里插入图片描述

2. 日志查看

我们这里说的日志不是Kafka的启动日志(运行日志),启动日志在Kafka安装目录下的logs/server.log中。消息在磁盘上都是以日志的形式保存的。我们这里说的日志是存放在/tmp/kafka_logs目录中的消息日志,即 partitionsegment

2.1 查看分区与备份

(1) 1 个分区 1 个备份

我们前面创建的 test 主题是 1 个分区 1 个备份,只有一个broker中有一个 test 主题的分区
在这里插入图片描述

(2) 3 个分区 1 个备份

再次创建一个主题,命名为 one,创建三个分区,但仍为一个备份。 依次查看三台broker,可以看到每台 broker 中都有一个 one 主题的分区
在这里插入图片描述

(3) 3 个分区 3 个备份

再次创建一个主题,命名为 two,创建三个分区,三个备份。依次查看三台 broker,可以看到每台 broker 中都有三份 two 主题的分区
在这里插入图片描述

2.2 查看分区与备份在 zk 中的信息

使用 zkCli.sh 命令连接上 zk,可以查看到 kafka 在 zk 的信息。

以三个分区,两个备份的主题cities为例

(1) /brokers 目录
在这里插入图片描述

(2) /brokers/ids 目录

存放的是 kafka 集群中各个主机的 broker-id 列表。
在这里插入图片描述
每个 id 的数据内容为当前主机的信息。
在这里插入图片描述

(3) /brokers/topics

可以看到topics下存放的是各个主题。
在这里插入图片描述

/brokers/topics/cities/partitions 中存放的是 cities主题下所包含的 partition。这里的 0、1、2,在/tmp/kafka-logs 目录中即为 cities-0,cities-1,cities-2。
在这里插入图片描述

每个分区下还可以查看状态信息。
在这里插入图片描述

在这里插入图片描述

leader:代表当前分区副本Leader是在哪个主机上
isr:同步列表,代表当前分区同步中的副本所在主机位置

直接查看主题节点内容。
在这里插入图片描述

partitions,可以看出:
主机2上存放的cities主题的分区是0和1
主机1上存放的cities主题的分区是1和2
主机0上存放的cities主题的分区是2和0
在这里插入图片描述
在这里插入图片描述

2.3 查看段 segment

进入test主题的分区:
在这里插入图片描述
(1) segment 文件

segment 是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。

文件名20位数字组成,代表当前文件记录消息之前,已经有多少条消息

在这里插入图片描述

170210.log文件表示记录的消息是从第170211条开始的
假如我们要查找第170213条消息,先根据二分法从所有segment文件中
根据文件名找到该消息应该在哪个文件名的文件中,最终找到170210的文件名
在将170213减去170210,等于3,对应170210.index文件中的2
通过2找到对应的偏移量0348
再根据偏移量0348在170210.log找到对应的消息内容

(2) 查看 segment

对于 segment 中的 log 文件,不能直接通过 cat 命令查找其内容,而是需要通过 kafka自带的一个工具查看。

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

解释:

kafka-run-class.sh命令指定运行工具类 kafka.tools.DumpLogSegments(Java类)
在这里插入图片描述

在这里插入图片描述

(3) 特殊主题 __consumer_offsets的分区

__consumer_offsets主题默认有0-49,默认50个分区,1个副本,可以配置。
在这里插入图片描述
Consumer 从 partition 中取出一批消息写入到 buffer 对其进行消费,消费完消息后,会将其消费消息的 offset 提交给 broker,以让 broker 记录下哪些消息是消费过的,消费者提交的 offset 被封装为了一种特殊的消息被写入到了一个由系统创建的、名称为__consumer_offset 的特殊主题的 partitions 中。

3. Kafka API

首先在命令行创建一个名称为 cities 的主题,并创建该主题的订阅者。

3.1 使用 kafka 原生 API

3.1.1 创建工程

创建一个 Maven 的 Java 工程,命名为 kafkaDemo。创建时无需导入依赖。为了简单,后面的发布者与消费者均创建在该工程中。
在这里插入图片描述

3.1.2 导入依赖

<!-- kafka 依赖 -->
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>1.1.1</version>
</dependency>

3.1.3 创建发布者 OneProducer

(1) 创建发布者类 OneProducer

public class OneProducer {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public OneProducer() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        // 创建消息记录(包含主题、消息本身)  (String topic, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
        // 创建消息记录(包含主题、key、消息本身)  (String topic, K key, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        // 创建消息记录(包含主题、partition、key、消息本身)  (String topic, Integer partition, K key, V value)
        ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 0, 1, "tianjin");
        producer.send(record);
    }
}

(2) 创建测试类 OneProducerTest

public class OneProducerTest {

    public static void main(String[] args) throws IOException {
        OneProducer producer = new OneProducer();
        producer.sendMsg();
        System.in.read();
    }
}

(3) 效果

多次启动,用命令方式消费:
在这里插入图片描述

3.1.4 创建发布者 TwoProducer

前面的方式在消息发送成功后,代码中没有任何提示,这里可以使用回调方式,即发送成功后,会触发回调方法的执行。

(1) 创建发布者类 TwoProducer

复制 OneProducer 类,仅修改 sendMsg()方法。

public class TwoProducer {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public TwoProducer() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        // 创建消息记录(包含主题、消息本身)  (String topic, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
        // 创建消息记录(包含主题、key、消息本身)  (String topic, K key, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        // 创建消息记录(包含主题、partition、key、消息本身)  (String topic, Integer partition, K key, V value)
        ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 2, 1, "tianjin");
        producer.send(record, (metadata, ex) -> {
            System.out.println("topic = " + metadata.topic());
            System.out.println("partition = " + metadata.partition());
            System.out.println("offset = " + metadata.offset());
        });
    }
}

(2) 创建测试类 TwoProducerTest

public class TwoProducerTest {

    public static void main(String[] args) throws IOException {
        TwoProducer producer = new TwoProducer();
        producer.sendMsg();
        System.in.read();
    }
}

(3) 效果

在这里插入图片描述

在这里插入图片描述

3.1.5 批量发送消息

(1) 创建发布者类 SomeProducerBatch

复制前面的发布者类,在其基础上进行修改。

public class SomeProducerBatch {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public SomeProducerBatch() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定生产者每10条向broker发送一次
        properties.put("batch.size", 10);
        // 指定生产者每50ms向broker发送一次
        properties.put("linger.ms", 50);

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        for(int i=0; i<50; i++) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "city-" + i);
            int k = i;
            producer.send(record, (metadata, ex) -> {
                System.out.println("i = " + k);
                System.out.println("topic = " + metadata.topic());
                System.out.println("partition = " + metadata.partition());
                System.out.println("offset = " + metadata.offset());
            });
        }
    }
}

(2) 创建测试类 ProducerBatchTest

public class ProducerBatchTest {

    public static void main(String[] args) throws IOException {
        SomeProducerBatch producer = new SomeProducerBatch();
        producer.sendMsg();
        System.in.read();
    }
}

(3) 效果

producer.send方法触发了50次、回调触发了50次,但是实际上发到Kafka只有5次,控制台看不出来
在这里插入图片描述

在这里插入图片描述

3.1.6 消费者组

(1) 创建消费者类 SomeConsumer

public class SomeConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public SomeConsumer() {
        // 调用父类构造器,两个参数:
        // 1)指定当前消费者名称
        // 2)指定消费过程是否会被中断(比如消费一批数据,还没消费完,消费者取消了,那么批数据消费是否会中断)
        super("KafkaConsumerTest", false);
	
        Properties properties = new Properties();
        String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
        // map中的key其实都是常量,定义在ConsumerConfig中
        // 指定kafka集群
        properties.put("bootstrap.servers", brokers);
        // 指定消费者组ID
        properties.put("group.id", "cityGroup1");
        // 开启自动提交,默认为true
        properties.put("enable.auto.commit", "true");
        
        // 指定自动提交时间间隔,默认5s
        properties.put("auto.commit.interval.ms", "1000");
        // 指定poll一次获取消息的最大数量,默认值是500
        properties.put("max.poll.records", 500);
        
        // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
        // 认为消费者已经挂掉。默认为10s
        properties.put("session.timeout.ms", "30000");
        // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
        properties.put("heartbeat.interval.ms", "10000");
        // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
        // earliest:指定offset为第一条offset
        // latest: 指定offset为最后一条offset
        properties.put("auto.offset.reset", "earliest");
        // 指定key与value的反序列化器
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker拉取消息。
        // 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
        // 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
        // > 0,表示当时间到后仍没有消息,再返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
    }
}

订阅消费主题,可以用正则、集合订阅多个主题:
在这里插入图片描述

可以指定poll一次获取消息的最大数量,默认500:
在这里插入图片描述

(2) 创建测试类 ConsumerTest

public class ConsumerTest {
    public static void main(String[] args) {
        SomeConsumer consumer = new SomeConsumer();
        consumer.start();
    }
}

(3) 效果
在这里插入图片描述

3.1.7 消费者同步手动提交

(1) 手动提交分类

手动提交又可以划分为同步提交异步提交提交。这些提交方式仅仅是doWork()方法不相同,其构造器是相同的。所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现两种不同的提交方式。

(2) 创建消费者类 SyncManualConsumer

  • A、原理
    同步提交方式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

  • B、 修改构造器
    直接复制前面的 SomeConsumer,在其基础上进行修改。
    在这里插入图片描述

  • C、 修改 doWork()方法
    在这里插入图片描述

public class SyncManualConsumer extends ShutdownableThread {
    private KafkaConsumer<Integer, String> consumer;

    public SyncManualConsumer() {
        // 两个参数:
        // 1)指定当前消费者名称
        // 2)指定消费过程是否会被中断
        super("KafkaConsumerTest", false);

        Properties properties = new Properties();
        String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
        // 指定kafka集群
        properties.put("bootstrap.servers", brokers);
        // 指定消费者组ID
        properties.put("group.id", "cityGroup1");

        // 开启手动提交
        properties.put("enable.auto.commit", "false");
        // 指定自动提交时间间隔,默认5s
        // properties.put("auto.commit.interval.ms", "1000");
        // 指定一次提交10个offset
        properties.put("max.poll.records", 10);

        // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
        // 认为消费者已经挂掉。默认为10s
        properties.put("session.timeout.ms", "30000");
        // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
        properties.put("heartbeat.interval.ms", "10000");
        // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
        // earliest:指定offset为第一条offset
        // latest: 指定offset为最后一条offset
        properties.put("auto.offset.reset", "earliest");
        // 指定key与value的反序列化器
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker拉取消息。
        // 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
        // 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
        // > 0,表示当时间到后仍没有消息,再返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
        // 手动同步提交
        consumer.commitSync();
    }
}

(4) 创建测试类 SyncManulTest

public class SyncManualTest {
    public static void main(String[] args) {
        SyncManualConsumer consumer = new SyncManualConsumer();
        consumer.start();
    }
}

3.1.8 消费者异步手动提交

(1) 原理

手动同步提交方式需要等待 broker 的成功响应,效率太低,影响消费者的吞吐量。异步提交方式是,消费者向 broker 提交 offset 后不用等待成功响应,所以其增加了消费者的吞吐量。

(2) 创建消费者类 AsyncManualConsumer

复制前面的 SyncManualConsumer 类,在其基础上进行修改。
在这里插入图片描述

//com.abc.consumer.test3.AsynManualConsumer#doWork
public void doWork() {
    // 订阅消费主题
    consumer.subscribe(Collections.singletonList("cities"));
    // 从broker拉取消息。
    // 参数表示,如果缓冲区中没有可用的数据,则在轮询中等待的时间(以毫秒为单位)。
    // 如果为0,立即返回缓冲区中当前可用的任何记录,否则返回空。
    // > 0,表示当时间到后仍没有消息,再返回空
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    for(ConsumerRecord record : records) {
        System.out.println("topic = " + record.topic());
        System.out.println("partition = " + record.partition());
        System.out.println("key = " + record.key());
        System.out.println("value = " + record.value());
    }
    // 手动异步提交
    // consumer.commitAsync();
    consumer.commitAsync((offsets, ex) -> {
        if(ex != null) {
            System.out.print("提交失败,offsets = " + offsets);
            System.out.println(", exception = " + ex);
        }
    });
}

(3) 创建测试类 AsyncManulTest

public class AsyncManualTest {
    public static void main(String[] args) {
        AsynManualConsumer consumer = new AsynManualConsumer();
        consumer.start();
    }
}

3.2 Spring Boot Kafka

为了简单,以下代码是将消息发布者与订阅者定义到了一个工程中的。

3.2.1 创建工程

创建一个 Spring Boot 工程,导入如下依赖。
在这里插入图片描述

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2.2 定义发布者

Spring 是通过 KafkaTemplate 来完成对 Kafka 的操作的。

(1) 修改配置文件

# 自定义属性
kafka:
  topic: cities

# 配置Kafka
spring:
  kafka:
    bootstrap-servers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092
    producer:   # 配置生产者
      # key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2) 定义发布者处理器

Spring Kafka 通过 KafkaTemplate 完成消息的发布。

@RestController
public class SomeProducer {
    @Autowired
    private KafkaTemplate<String, String> template;

    // 从配置文件读取自定义属性
    @Value("${kafka.topic}")
    private String topic;

    // 由于是提交数据,所以使用Post方式
    @PostMapping("/msg/send")
    public String sendMsg(@RequestParam("message") String message) {
        template.send(topic, message);
        return "send success";
    }
}

3.2.3 定义消费者

Spring 是通过监听方式实现消费者的。

(1) 修改配置文件

在配置文件中添加如下内容。注意,Spring 中要求必须为消费者指定组。

# 自定义属性
kafka:
  topic: cities

# 配置Kafka
spring:
  kafka:
    bootstrap-servers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092

    consumer:   # 配置消费者
      group-id: group0  # 消费者组
      # key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(2) 定义消费者

Spring Kafka 是通过 KafkaListener 监听方式来完成消息订阅与接收的。当监听到有指定主题的消息时,就会触发@KafkaListener 注解所标注的方法的执行。

@Component
public class SomeConsumer {

    @KafkaListener(topics = "${kafka.topic}")
    public void onMsg(String message) {
        System.out.println("Kafka消费者接受到消息 " + message);
    }

}
演示

在这里插入图片描述

在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐