Kafka-为什么用消息队列、消息队列解决的实际问题、MQ的几种类型、kafka的基本概念及单播多播使用、kafka的topic和partition、2个面试题
kafka集群、listeners配置、broker&topic&partition(leader、follower)关系、消费问题(同步、异步、ack应答机制、缓冲区、消费者poll长轮询)
kafka吞吐量高的原因、kafka的HA(高可用)、Controller、Rebalance、kafka线上问题优化、springboot整合kafka基本使用、eagle后台管理

kafka集群

在一台服务器上,已经安装好的kafka中,到config配置目录下多创建两个server.properties文件,一共启动2个ekafka的服务。

在这里插入图片描述

注意:什么时候配置listeners

上一篇博客中介绍到将kafka下载解压缩后,直接启动就完事了,但是需要注意的是配置文件中有个listenersadvertised.listeners的存在。

其实如果启动一个kafka服务,只是一个单机使用的,且不涉及到外网访问,那么什么都没必要去设置。但是如果是集群之类的,那么就需要做一些相应的配置了。

  • listeners:其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。一般公司内网的kafka集群只需要配置这个就行了。

    listeners=PLAINTEXT://内网ip:9092
    
  • advertised.listeners: Advertise的含义表示宣称的、公布的,就是组监听器是 Broker 用于对外发布的。涉及到外网kafka集群时,还需要配置这个。只要是涉及到外网访问的,必须要设置这个,否则Consumer或Producer连接不到kafka的服务

    advertised.listeners=PLAINTEXT://外网ip:9092
    

配置

我这里涉及到外网访问,需要把 advertised.listeners配置一下。

  • server.properties
broker.id=0
# 指定监听所有的ip, 监听所有ip的9092端口
listeners=PLAINTEXT://0.0.0.0:9092
# 注册到zookeeper的ip和端口
advertised.listeners=PLAINTEXT://xxx.xxx.xx.xx:9092
log.dir=/env/kafka/data/kafka-logs
  • server1.properties
broker.id=1
# 指定监听所有的ip, 监听所有ip的9093端口
listeners=PLAINTEXT://0.0.0.0:9093
# 注册到zookeeper的ip和端口
advertised.listeners=PLAINTEXT://xxx.xxx.xx.xx:9093
log.dir=/env/kafka/data/kafka-logs-1

注意:以上listeners写的是 0.0.0.0,那么必须要把 advertised.listeners配置一下,advertised.listeners是对外的地址和端口,是要注册到Zookeeper上面的,如果没有配置advertised.listeners,那么advertised.listeners的值就是 listeners的值,Zookeeper就没法知道broker的具体地址是什么了。

按照0.0.0.0配置之后,其他的所有ip都能访问到此broker的端口上。

在启动kafka服务器前,我们要先确保Zookeeper已经启动了,如果没有启动,请先启动Zookeeper的服务

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

然后进入到kafka安装目录的bin目录下,执行命令启动2台服务器。守护进程方式启动

./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties

启动完之后,可以到zookeeper的服务中看一下有没有注册好broker们。

在这里插入图片描述

可以看到已经注册了两个broker,id分别是 0 和 1

集群中broker、topic、分区和副本

创建一个topic主题,我们相当于有两个broker,然后创建2个分区,2个副本

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic my-topic

创建完毕后,查看一下topic的详细信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic

在这里插入图片描述

  • Topic:主题名称
  • PartitionCount:分区数量
  • ReplicationFactor: 副本数量
  • Config: 详细配置
    • Partition:分区号
    • Leader: Leader所在的broker id
    • Replicas: 当前副本存在的broker节点id,上图第二行:Partition0的副本们所在的broker的id们(谁在前面谁是Leader,如 0,1,那么id为0的节点的Partition就是Leader)
    • Isr: 可以同步的broker节点和已经同步的broker节点,存放在 Isr的集合中。当Leader挂掉时,新的Leader会在Isr中产生

在这里插入图片描述

那么副本是一个什么样的概念呢?

副本是为了主题中的分区创建多个备份,这些备份在kafka集群的多个broker中,会有一个副本作为leader,其他的副本都是follower,注意:在一个broker节点服务器中,同一个topic中的 Partition,不可能存在Leader和Follower都在一个broker服务器中。因为其他的备份都是被保存在其他的broker服务器上的

下图,可以看到,两个broker的数据文件中,都存在my-topic-0my-topic-1。这就是数据同步,副本的体现。

在这里插入图片描述

leader、follower、Isr

  • leader: kafka的读写操作,都是发生在Leader上的。leader负责把数据同步给follower。当leader挂了,经过选举后,从多个follower中选举出新的leader。
  • follower: 接收leader同步的数据
  • Isr: 可以同步和已同步的节点会被保存到Isr的集合中。细节:如果Isr中的某个节点的性能较差,那么它会被踢出Isr集合。

一个kafka集群中包括:

  • 多个broker
  • 每个broker中又存在多个topic
  • 每个topic中,可以设置多个partition分区(解决数据文件较大问题)
  • 每个partition又可以设置多个副本,保证系统的高可靠(当一个leader挂了,其他的follower前赴后继提供服务)

kafka集群消费问题

创建一个生产者,这个生产者是向一个kafka集群中发送消息

./kafka-console-producer.sh --broker-list xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --topic my-topic

创建消费者,监听kafka集群中 my-topic的主题

./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --from-beginning --topic my-topic

然后在消费者中,发送一些信息,完全没问题,可以接收到

在这里插入图片描述

那么假如是两个消费者且在同一个消费组中,消费同一个topic的partition中的消息呢?

./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xx:9092,xxx.xxx.xx.xx:9093 --from-beginning --consumer-property group.id=group1  --topic my-topic

第一个消费者正常消费消息。

在这里插入图片描述

第二个消费者,无法正常消费消息。

在这里插入图片描述

继续生产消息,发现只有一个消费者能够正常接收到消息。

在这里插入图片描述

这种其实也就是前一篇博客上说的一种消息,单播消息。在一个消费者组中,消费同一个topic的partition时,只有一个消费者能消费到这个消息。

在这里插入图片描述

每个broker中有多个partition。一个partition只能被一个消费者组里的某一个消费者消费,从而保证消费顺序。一个消费者可以消费多个partition。

消费组中的消费者数量不能比topic中的partition数量多,否则多出来的消费者消费不到消息。

消息的同步、异步发送

同步发送消息

在这里插入图片描述

如果生产者发送消息到kafka服务器后没有接收到kafka服务器返回的ack码,生产者会被阻塞,阻塞3s的时间,如果还没有收到 ack,会进行重试,重试3次。

异步发送消息

在这里插入图片描述

异步发送,生产者发送消息后就可以执行后面的业务,broker在收到消息后异步调用生产者的callback回调方法。异步发送可能会由于网络不稳定造成消息丢失。

生产者如何保证数据不丢失(ack机制)

在同步发消息的场景下:生产者将消息发送到broker后,ack会有3种不同的选择:

ack=0 : 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容		易丢失数据

ack=1(默认) : 需要等待leader成功将数据写入本地的log,但是不需要等待所有follower是否写入成功。就可以继续发送下一条消息。这种情况,如果follower没有备份成功,此时leader又挂掉了,那么消息会丢失。

ack=-1或者all,需要等待 min.insync.replicas(默认值1,推荐配置>=2),这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。 一般除非是金融级别的系统,或者是跟钱有关系的场景会使用。

当ack=0时,kafka集群中不需要任何broker收到消息,立即返回ack给生产者,效率最高,最容易丢失数据

ack=1(默认),leader完成存盘(将数据存入本地)后,返回ack给生产者,性能和安全是均衡的

ack=-1或all,当一定数量的副本(注意包括leader)完成了存盘操作时,返回ack,效率最低安全性能最高。

在生产者的配置中,可以配置生产者多少时间内如果没有接收到ack,会进行重试机制,重新发送一次信息,也可以配置重试的次数。

发送消息的缓冲机制

生产者会有一个缓冲区(默认32m),会把消息放入到缓冲区中。然后生产者中有一个本地线程,从缓冲区中一次拉取16k的消息(如果数据没有达到16k,10ms后也会将数据发送),然后发送给kafka服务器。

在这里插入图片描述

  • 生产者默认创建一个消息缓冲区,用来存放要发送的消息,大小是32m
  • 生产者本地线程会去缓冲区中一次拉取16k的数据发送到broker
  • 如果线程拉取不到16k数据,间隔10ms也会把消息发送到broker

生产者的实现细节

生产者在发送消息的时候怎么知道自己的消息要发到哪个分区(多分区场景下)里呢?有几种方式:

  • 指定具体的partition,比如0或者别的分区
  • 发送消息时传入一个key,然后根据hash算法算出一个partition
  • 轮询方式传入多个partition中

消费者offset的手动提交和自动提交

无论是手动提交还是自动提交,都需要把所属的消费者+消费的主题+消费的某个分区以及消费的偏移量offset提交给_consumer_offsets主题中

  • 自动提交:自动提交就是当消费者将消息从kafka中间件poll下来之后,直接去提交一个offset到kafka的_consumer_offsets的50分区中的某个分区中。假如消费者还没来得及消费,然后出现突然宕机,,但是offset已经提交了,那么这个消息就消费不到了。

    poll() 是消费者的一个拉取消息的长轮询

  • 手动提交:可以在消费消息后再去提交offset

手动提交分为同步提交和异步提交

  • 同步提交 : 同步提交offset,当前的线程会被阻塞,直到offset提交成功

    在消费完消息后调用同步提交方法,当集群返回ack前一直进行阻塞,返回ack后标识提交成功。再执行之后的逻辑

    consumer.commitSync();	// 会进行线程阻塞
    
  • 异步提交 : 当前的线程不会阻塞,可以继续处理后续的程序逻辑’

    消息消费完之后,不需要等待ack,直接执行之后的逻辑,然后设置一个回调方法,供集群调用

    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception  e) {
         if(e !=null) {
             // 提交出现错误了,及时处理
         }
        }
    });
    

消费者长轮询poll消息的一些配置

  • 消费者和broker建立一个长连接,poll消息,可以设置一次poll多少条数据。假如300条数据。

    max.poll.records=300
    
  • 可以设置如果两次poll消息的时间超过了某个时间间隔,那么kafka认为其消费能力过弱,将其踢出消费组,将分区分配给其他的消费者

    max.poll.interval.ms=300000
    
  • 可以设置长轮询的时间,假如是1000ms。那么如果一次poll到300条数据,就开始进行消费。

  • 如果一次没有poll到300条,且现在时间不超过1000ms,那么继续poll,要么拉取到300条数据,要么到1000ms时间就开始消费。

  • 也就是有两种情况,一种是在规定时间内poll到了指定的数据条数,一种是没在指定时间内poll到指定条数,然后都开始进行消费数据。

其他配置,健康检查

消费者需要每隔一段时间都去给kafka发送心跳,如果在一段时间内kafka没有收到消费者的心跳,那么会把该消费者踢出消费组,进行rebalance,将分区分配给其他消费者。

session.timeout.ms=3000

消费者的消费方式

只是作为了解,后面使用springboot,都是注解声明式的。不用硬编码。

指定分区消费

消费者可以指定消费主题下的哪个分区中的消息

java实现

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)))

指定偏移量offset开始消费

consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

从指定的时间点开始消费

一个场景:比如我们需要消费2个小时内的消息。

这种场景不太常见,遇见了查阅下资料即可。

新消费组的offset规则

当消费组是一个新创建的消费组,或者指定了offset的消费方式后,这个offset不存在该如何消费?

  • latest(默认) : 只消费最新的消息
  • earliest: 第一次从头开始消费,以后按照消费offset记录继续消费,注意这里不是每次都从头开始消费,只是第一次从头消费。
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐