kafka概述

一、kafka概述

1.1 定义

Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

1.2 消息队列

1.2.1 传统消息队列的应用场景

img

使用消息队列的好处:

  1. 解耦

    允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  2. 可恢复性

    系统的一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  3. 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

  4. 灵活性和峰值处理能力

    使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  5. 异步通信

    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.2.2 消息队列的两种形式

  1. 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除。)

    消息生产者生产消息发送到 Queue 中,然后消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但对于一个消息而言,只有一个消费者可以消费。

    img

  2. 发布 / 订阅模式(一对多,消费者消费数据之后不会清除消息)

    消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 中的消息会被所有订阅者消费。

    img

1.3 Kafka 基础架构

img

  1. Producer:

    消息生产者,就是向 Kafka broker 发消息的客户端。

  2. Consumer:

    消息消费者,向 Kafka broker 取消息的客户端。

  3. Consumer Group(CG):

    消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

  4. Broker:

    一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

  5. Topic:

    可以理解为一个队列,生产者和消费者面向的都是一个 topic。

  6. Partiton:

    为了实现拓展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 Partition,每个 partition 都是一个有序的队列。

  7. Replication:

    副本,为保证集群中某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然可以继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

  8. leader:

    每个分区多个副本的 ” 主 “,生产者发送数据的对象,以及消费者消费数据时的对象都是 leader。

  9. follower:

    每个分区多个副本的 “从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader。

二、kafka安装部署

2.1安装部署

2.1.1.jar包下载

网址:http://kafka.apache.org/downloads.html

!不要下载最新版,在win10下有问题,下载2.8.1如下图

下载Binary版本:

oQQtyR.png

2.1.2.解压到指定的文件夹下

2.1.3.创建两个文件夹以供后续使用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aiBYooS2-1644458525646)(https://images1.tqwba.com/20201029/5trqdowih4q.png)]

2.1.4. 修改配置文件

(1)修改zookeeper.properties 文件

修改 kafka_2.12-2.8.1\config\zookeeper.properties 文件 大概第16行

注意文件分隔符是\\

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WL7WD6Hz-1644458525651)(https://images1.tqwba.com/20201029/twfv43l23jb.png)]

(2) 修改server.properties 文件

修改 kafka_2.12-2.8.1\config\server.properties 文件 大概第60行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ql4BDbuW-1644458525655)(https://images1.tqwba.com/20201029/xlzkbjusutq.png)]

2.2启动

2.2.1.启动 kafka 内置的 zookeeper

运行 cmd 命令:

!如果报错 The input line is too long,将文件路径缩小即可,如直接放在C盘下

!如果报错Unable to access datadir,请把修改配置文件时的两个路径均修改为相对路径

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rOwCHlMa-1644458525657)(https://images1.tqwba.com/20201029/iksbqbox45t.png)]

不关闭当前窗口

2.2.2.启动 kafka 服务

运行 cmd 命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

img

不关闭当前窗口

2.2.3.创建一个名为 test1 的 topic 测试主题 kafka

运行 cmd 命令:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

img
该窗口可关闭

2.2.4.创建消息生产者生产消息

运行 cmd 命令:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0kvDgrbJ-1644458525662)(https://images1.tqwba.com/20201029/vnifjx1ckrw.png)]

不关闭当前窗口

2.2.5.创建消息消费者接收消息

运行 cmd 命令:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lyzsgoOg-1644458525663)(https://images1.tqwba.com/20201029/tjyef1mgnsh.png)]

不关闭当前窗口

2.2.6.测试消息发送和接收

img

测试成功

三、kafka架构深入理解

3.1 Kafka 工作流程

3.1.1 写入方式

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

3.1.2 分区(Partition)

Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。通常,不同应用产生不同类型的数据,可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生成者写入的新消息。

afka集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题(topic)看作进行了分区的日志文件(partition log)。主题的每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫做偏移量(offset),这个偏移量能够唯一地定位当前分区中的每一条消息。

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

下图中的topic有3个分区,每个分区的偏移量都从0开始,不同分区之间的偏移量都是独立的,不会相互影响。
img

这里写图片描述

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置,这种情况下消息会均衡地分布到不同的分区。

演示

1.启动zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
2 启动服务
.\bin\windows\kafka-server-start.bat .\config\server.properties
3 创建主题

创建主题:

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first

这里主要解释一下–replication-factor 1 和 --partitions 1的含义:

–replication-factor 1表示的意思是给主题first的副本数为1
–partition 1的意思是将主题first分为1个分区,在实际运用中我们可以选择多个分区,分区的好处是为了避免给kafka集群中的节点服务器造成过大的压力,比如说没有分区的时候,一个主题位于一个服务器上面,如果该主题中的消息数量过大的话,那么会增加服务器的压力,通过分区的这种方式将同一个topic可以分配到不同的服务器当中,来去缓解服务器端的压力。

通过上面的命令我们就可以创建一个名为first的主题

4 查看主题

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic first

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1O0paKTp-1644458525669)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\image-20211130095949006.png)]

5 查看主题列表

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h4w3xhSU-1644458525670)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\image-20211130102339901.png)]

3.2 Kafka 存储机制

img

  • 每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。

    但每一个段segment file消息数量不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每一个文件大小为1G)

  • 每一个partiton仅仅须要支持顺序读写即可了。segment文件生命周期由服务端配置參数决定。

这样做的优点就是能高速删除无用文件。有效提高磁盘利用率。

3.2.1 数据分片

由于生产者生产的消息不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件,“.index” 文件和 “.log 文件”。这些文件位于一个文件夹下,该文件夹命名规则为:topic 名称 + 分区序号。

比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
多个分区在集群中多个broker上的分配方法
1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上

img

3.2.2 log分段:

每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。

img

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

3.2.3 日志的清除策略以及压缩策略

日志的清理策略有两个

1 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
2根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。
通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天
kafka会启动一个后台线程,定期检查是否存在可以删除的消息。

日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。

img

3.3 Kafka 生产者

在 Kafka 中,我们把产生消息的那一方称为生产者,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka 后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka 后台,然后淘宝会根据你的爱好做智能推荐,致使你的钱包从来都禁不住诱惑,那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢?发送过程是怎么样的呢?

尽管消息的产生非常简单,但是消息的发送过程还是比较复杂的
我们从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。 在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器。

如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了。

img

ProducerRecord 还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。
然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。

Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。

Kafka 对于数据的读写是以分区为粒度的,分区可以分布在多个主机(Broker)中,这样每个节点能够实现独立的数据写入和读取,并且能够通过增加新的节点来增加 Kafka 集群的吞吐量,通过分区部署在多个 Broker 来实现负载均衡的效果

  1. 分区的原因

    (1)方便在集群中扩展,每个 partition 可以通过调整以适应它们的机器,而一个 topic 又可以有多个 partition 组成,因此整个集群就可以适应任意大小的数据了。

    (2)可以提高并发,因为可以以 partition 为单位读写了。

  2. 分区的原则

    我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

    • topic:string 类型,NotNull
    • partition:int 类型,可选
    • timestamp:long 类型,可选
    • key:string类型,可选
    • value:string 类型,可选
    • headers:array 类型,Nullable

    (1)指明 partition 的情况下,直接将指明的值作为 partition 值;

    (2)没有指明 partition 值但有 key 的情况下,将 key 值的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

    (3)既没有 partition 又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin (轮询)算法

3.3.1 数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 中的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack (acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

img

3.4 Kafka 消费者

3.4.1概念

Kafka消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。Kafka消费者消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题,通过增加消费者,让它们分担负载,分别处理部分分区的消息。

3.4.2 消费方式

1.消费位移确认

Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。
(1)自动提交。在创建一个消费者时,默认是自动提交偏移量,当然我们也可以显示设置为自动。例如,我们创建一个消费者,该消费者自动提交偏移量

(2)手动提交。在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,这种场景下,我们必须手动控制偏移量的提交。

2 以时间戳查询消息

Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。

3 消费速度控制

提供 pause(Collection partitions)和resume(Collection
partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。

3.5 Kafka 高效读取数据

1.kafka本身是分布式集群,同时采用分区技术,并发度高。
2.顺序写磁盘,kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100k/s。
3.零复制技术

在这里插入图片描述

零拷贝是文件只需要经过Page Cache就可以直接发送出去了,这样就极大的增加了发送数据的效率。

应用Page Cache,kafka将数据直接持久化到Page Cache中,其实就是内存中,这样有几个优点:1,I/O Scheduler 可以将多个小块的写组装成大块的写操作,降低了I/O次数。

四、kafka API

4.1 Producer API

4.1.1 消息发送流程

Kafka 的 producer 发送信息采用的是异步发送的方式。在消息发送的过程中,涉及到两个线程,一个是 main 线程,一个是 Sender 线程,以及一个线程共享变量—— RecordAccumulator 。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

img

4.1.2 异步发送 API

1.导入依赖。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

2.编写代码。

需要用到的类:

KafkaProducer: 需要一个生产者对象,用来发送数据。
ProducerConfig: 获取所需一系类配置参数。
ProducerRecord: 每条数据都要封装成一个 ProducerRecord 对象。

(1)不带回调函数的 API

public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {

        String server = "162.14.109.33:9092";
        // 1.创建kafka生产者的配置信息
        Properties properties = new Properties();
        // 2.指定连接的Kafka集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
        // 3.ACK应答级别
        //properties.put("acks", "all");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 4.重试次数
        properties.put("retries", 0);
        // 5.批次大小
        properties.put("batch.size", 16384);
        // 6.等待时间
        properties.put("linger.ms", 10000);
        // 7.RecordAccumulator 缓冲区大小
        properties.put("buffer.memory", 33554432);
        // 8.key,value的序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 9.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 10.发送数据

        //异步
//这个生产者写一条消息的时候,先是写到某个缓冲区,
// 这个缓冲区里的数据还没写到broker集群里的某个分区的时候,
// 它就返回到client去了。虽然效率快,但是不能保证消息一定被发送出去了。

        producer.send(new ProducerRecord<>("test2", "fmy","这是生产者异步发送的消息!"));


//同步
//这个生产者写一条消息的时候,它就立马发送到某个分区去。
// follower还需要从leader拉取消息到本地,follower再向leader发送确认,
// leader再向客户端发送确认。由于这一套流程之后,客户端才能得到确认,所以很慢。
//        Future<RecordMetadata> demo = producer.send(new ProducerRecord<>("demo", "neu", "这里是生产者同步发送的消息!"));
//        RecordMetadata recordMetadata = demo.get();
//        System.out.println("得到ack");
        // 11. 关闭资源
        producer.close();

    }
}

(2)带回调函数的 API

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

public class CallBackProducer {
    public static void main(String[] args) {

        String server = "162.14.109.33:9092";
        // 1.创建配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        producer.send(new ProducerRecord<>("test2", "fmy","这是带回调方法的生产者发送的消息!"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println("元数据分区:"+metadata.partition() + ",偏移量:" + metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });
        // 4.关闭资源
        producer.close();
    }
}

(3)自定义分区器

public class MyPartitioner implements Partitioner
{
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
    {
        return 1;
    }

    @Override
    public void close()
    {

    }

    @Override
    public void configure(Map<String, ?> map)
    {

    }
}

在生产者中加入自定义分区器

public class PartitionProducer
{
    public static void main(String[] args) {

        String server = "162.14.109.33:9092";
        // 1.创建配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 添加分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.fmy.kafka.config.MyPartitioner");

        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        producer.send(new ProducerRecord<>("test2", "fmy","这是带回调方法的生产者发送的消息!"), (RecordMetadata metadata, Exception exception)-> {
            if (exception == null) {
                System.out.println("已收到ack,这里是回调方法");
                System.out.println("元数据分区:"+metadata.partition() + ",偏移量:" + metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });

        // 4.关闭资源
        producer.close();
    }
}

4.1.3 同步发送 API

同步发送的意思是,一条消息发送后,会阻塞当前线程,直至返回 ack。由于 send 方法返回的是一个 Future 对象,根据 Future 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方法即可。

//异步
//这个生产者写一条消息的时候,先是写到某个缓冲区,
// 这个缓冲区里的数据还没写到broker集群里的某个分区的时候,
// 它就返回到client去了。虽然效率快,但是不能保证消息一定被发送出去了。
//        producer.send(new ProducerRecord<>("test2", "fmy","这是生产者异步发送的消息!"));

//同步
//这个生产者写一条消息的时候,它就立马发送到某个分区去。
// follower还需要从leader拉取消息到本地,follower再向leader发送确认,
// leader再向客户端发送确认。由于这一套流程之后,客户端才能得到确认,所以很慢。
        Future<RecordMetadata> demo = producer.send(new ProducerRecord<>("demo", "neu", "这里是生产者同步发送的消息!"));
        RecordMetadata recordMetadata = demo.get();

4.2 Consumer API

4.2.1 自动提交 offset

  1. 编写代码。

    需要用到的类:

    KafkaConsumer: 需要创建一个消费者对象,用来消费数据。
    ConsumerConfig: 获取所需的一些列配置参数。
    ConsumerRecord: 每条数据都要封装成一个 ConsumerRecord 对象。

public class MyConsumer
{
    public static void main(String[] args) {

        String server = "162.14.109.33:9092";
        /* 1.创建消费者配置信息 */

        Properties properties = new Properties();
        /* 2.给配置信息赋值 */

        /* 连接的集群 */
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);

//        /* 开启自动提交 */
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        /* 自动提交的延时 */
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        /* 关闭自动提交 */
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        /* key,value的反序列化 */
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        /* 消费者组 */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigData");

        /* 3.创建消费者 */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        /* 4.订阅主题 */
        consumer.subscribe(Collections.singletonList("test2"));

        /* 5.获取数据 */
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            /* 解析并打印consumerRecords */
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区"+consumerRecord.partition()+"偏移量:"+consumerRecord.offset());
                System.out.println("key:"+consumerRecord.key() + ",value:" + consumerRecord.value());
            }


            /* 同步提交,当前线程会阻塞直到 offset 提交成功 */
//            consumer.commitSync();


            /* 异步提交 */
//            consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)-> {
//                if (exception != null) {
//                    System.err.println("Commit failed for" + offsets);
//                }
//            });
        }
    }
}

4.2.2 手动提交 offset

虽然自动提交 offset 十分简便,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 提供了手动提交 offset 的 API。
  手动提交 offset 的方法有两种:分别是 commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次拉取的一批数据最高的偏移量提交。不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试;而 commitAsync 则没有失败重试机制,故有可能提交失败。

1.同步提交 offset

            /* 同步提交,当前线程会阻塞直到offset 提交成功 */
            consumer.commitSync();

2.异步提交 offset

            /* 异步提交 */
            consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)-> {
                if (exception != null) {
                    System.err.println("Commit failed for" + offsets);
                }
            });

3.数据漏消费和重复消费分析

无论是同步提交还是异步提交 offset,都有可能会造成数据漏消费或重复消费。先提交 offset 后消费,有可能造成数据的漏消费;先消费后提交 offset,有可能造成数据的重复消费。

4.2.3 自定义存储 offset

Kafka 0.9 版本以前,offset 存储在 Zookeeper,0.9 版本后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
  offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance
  当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的消费者主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
  消费者发生 Rebalance 后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
  要实现自定义存储 offset,需要借助 ConsumerRebalanceListener。其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

public class CustomerConsumer {
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

    public static void main(String[] args) {

        String server = "162.14.109.33:9092";
        //创建配置信息
        Properties properties = new Properties();
        //Kafka 集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        //消费者组,只要 group.id 相同,就属于同一个消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigData");
        //关闭自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //Key 和 Value 的反序列化类
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //创建一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //消费者订阅主题
        consumer.subscribe(Collections.singletonList("test2"), new ConsumerRebalanceListener() {

            //该方法会在 Rebalance 之前调用
            @Override
            public void
            onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }

            //该方法会在 Rebalance 之后调用
            @Override
            public void
            onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition));
                    //定位到最近提交的 offset 位置继续消费
                }
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);//异步提交
        }
    }

    //获取某分区的最新 offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }

    //提交该消费者所有分区的 offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
    }
}

4.3 自定义拦截器

4.3.1 拦截器原理

Producer 拦截器(Interceptor)是在 Kafka 0.10 版本引入的,主要用于实现客户端的定制化控制逻辑。拦截器使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求。同时,producer 允许用户指定多个 Interceptor 按序作用于同一消息从而形成一个拦截链。
  Interceptor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

1.onsend(ProducerRecord)

该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算

2.onAcknowledgement(RecordMetadata,Exception)

该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。

3.close()

关闭 interceptor,主要用于执行一些资源清理工作。

4.configure(configs)

获取配置信息和初始化数据时调用。

4.3.2 拦截器案例

1.需求

实现一个简单的双 Interceptor 组成的拦截器链。第一个 Interceptor 会在消息发送前将时间戳信息添加到消息 value 的最前部;第二个 Interceptor 会在消息发送后更新成功发送消息和失败发送消息个数。

2.分析

img

3.实现流程

(1)编写时间戳拦截器

//时间拦截器
//在消息发送前将时间戳信息加到消息value的最前部
public class TimeInterceptor implements ProducerInterceptor<String, String>
{
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord)
    {
        // 1.取出数据
        String value = producerRecord.value();
        // 2.创建一个新的ProducerRecord对象,并返回//将
        return new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), producerRecord.key(),
                System.currentTimeMillis() + "," + value);
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e)
    {

    }

    @Override
    public void close()
    {

    }

    @Override
    public void configure(Map<String, ?> map)
    {

    }
}

(2)编写计数拦截器

//计数拦截器
//在消息发送后更新成功发送消息或发送失败的消息数
public class CounterInterceptor implements ProducerInterceptor<String,String>
{
    int success = 0;
    int error = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord)
    {
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e)
    {
        if (recordMetadata != null) {
            success++;
        } else {
            error++;
        }
    }

    @Override
    public void close()
    {
        System.out.println("success:" + success);
        System.out.println("error:" + error);
    }

    @Override
    public void configure(Map<String, ?> map)
    {

    }
}

(3)编写 Producer 主程序

public class InterceptorProducer
{
    public static void main(String[] args) {
        String server = "162.14.109.33:9092";
        // 1.创建配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 添加拦截器
        ArrayList<String> interceptors = new ArrayList<>();
        interceptors.add("com.fmy.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.fmy.kafka.interceptor.CounterInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        for (int i = 0; i < 5; i++)
        {
            producer.send(new ProducerRecord<>("test2", "fmy", "这是带拦截器的生产者发送的消息!"));
        }

        // 4.关闭资源
        producer.close();
    }
}

五、kafka事务

事务是一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。  

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID获得原来的PID。
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator,Producer就是通过和 Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit 的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

5.1 事务场景

  1. 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
  5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别。

5.2 几个关键概念和推导

  1. 因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。

  2. 事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。

  3. 因为事务存在commit和abort两种操作,而客户端又有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。

  4. producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是Transactional Id,一个producer挂了,另一个有相同Transactional Id的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了,kafka目前没有引入全局序,所以也没有transaction id,这个Transactional Id是用户提前配置的。

  5. TransactionalId能关联producer,也需要避免两个使用相同TransactionalId的producer同时存在,所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch

5.3 事务语义

5.3.1 多分区原子写入

事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。例如,处理过程中发生了异常并导致事务终止,这种情况下,事务中的消息都不会被Consumer读取。现在我们来看下Kafka是如何实现原子的“读取-处理-写入”过程的。

首先,我们来考虑一下原子“读取-处理-写入”周期是什么意思。简而言之,这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进行了一些处理(如B = F(A))之后将消息B写入topic tp1,则只有当消息A和B被认为被成功地消费并一起发布,或者完全不发布时,整个读取过程写入操作是原子的。

现在,只有当消息A的偏移量X被标记为消耗时,消息A才被认为是从topic tp0消耗的,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。

由于offset commit只是对Kafkatopic的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

5.3.2 粉碎“僵尸实例”

我们通过为每个事务Producer分配一个称为transactional.id的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer实例。
API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker用给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。

一旦这个epoch被触发,任何具有相同的transactional.id和更旧的epoch的Producer被视为僵尸,并被围起来, Kafka会拒绝来自这些Procedure的后续事务性写入。

5.3.3 读事务消息

现在,让我们把注意力转向数据读取中的事务一致性。

Kafka Consumer只有在事务实际提交时才会将事务消息传递给应用程序。也就是说,Consumer不会提交作为整个事务一部分的消息,也不会提交属于中止事务的消息。

值得注意的是,上述保证不足以保证整个消息读取的原子性,当使用Kafka consumer来消费来自topic的消息时,应用程序将不知道这些消息是否被写为事务的一部分,因此他们不知道事务何时开始或结束;此外,给定的Consumer不能保证订阅属于事务一部分的所有Partition,并且无法发现这一点,最终难以保证作为事务中的所有消息被单个Consumer处理。

简而言之:Kafka保证Consumer最终只能提供非事务性消息或提交事务性消息。它将保留来自未完成事务的消息,并过滤掉已中止事务的消息。

5.4.4 事务处理Java API

producer提供了五个事务方法:

1.initTransactions 方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException;
2.beginTransaction 方法用来开启事务;
3.sendOffsets 方法为消费者提供在事务内的位移提交的操作;
4.commitTransaction 方法用来提交事务;
5.abortTransaction 方法用来中止事务,类似于事务回滚。
5.4.4.1 api分类

在一个原子操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引入的场景,最后一种情况没有使用价值。
1.只有Producer生产消息;
2.消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式
3.只有consumer消费消息,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的,所以一般不会使用这种情况

5.4.4.2 事务配置

1、创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭
  • 而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
  • 设置isolation.level

2、创建生成者,代码如下,需要:

  • 配置transactional.id属性
  • 配置enable.idempotence属性
5.4.4.3 “只有写”应用程序示例
package com.kafka.demo.transaction;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class TransactionProducer {
    private static Properties getProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("retries", 2); // 重试次数
        props.put("batch.size", 100); // 批量发送大小
        props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
        props.put("client.id", "producer-syn-2"); // 发送端id,便于统计
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("transactional.id","producer-1"); // 每台机器唯一
        props.put("enable.idempotence",true); // 设置幂等性
        return props;
    }
    public static void main(String[] args) {
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
        // 初始化事务
        producer.initTransactions();
        try {
            Thread.sleep(2000);
            // 开启事务
            producer.beginTransaction();
            // 发送消息到producer-syn
            producer.send(new ProducerRecord<String, String>("producer-syn","test3"));
            // 发送消息到producer-asyn
            Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>("producer-asyn","test4"));
            // 提交事务
            producer.commitTransaction();
        }catch (Exception e){
            e.printStackTrace();
                // 终止事务
            producer.abortTransaction();
        }
    }
}
5.4.4.4 消费-生产并存(consume-Transform-Produce)

在一个事务中,既有生产消息操作又有消费消息操作,即常说的Consume-tansform-produce模式。如下实例代码

package com.kafka.demo.transaction;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

public class consumeTransformProduce {
    private static Properties getProducerProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("retries", 3); // 重试次数
        props.put("batch.size", 100); // 批量发送大小
        props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
        props.put("client.id", "producer-syn-2"); // 发送端id,便于统计
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("transactional.id","producer-2"); // 每台机器唯一
        props.put("enable.idempotence",true); // 设置幂等性
        return props;
    }

    private static Properties getConsumerProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test_3");
        props.put("session.timeout.ms", 30000);       // 如果其超时,将会可能触发rebalance并认为已经死去,重新选举Leader
        props.put("enable.auto.commit", "false");      // 开启自动提交
        props.put("auto.commit.interval.ms", "1000"); // 自动提交时间
        props.put("auto.offset.reset","earliest"); // 从最早的offset开始拉取,latest:从最近的offset开始消费
        props.put("client.id", "producer-syn-1"); // 发送端id,便于统计
        props.put("max.poll.records","100"); // 每次批量拉取条数
        props.put("max.poll.interval.ms","1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("isolation.level","read_committed"); // 设置隔离级别
        return props;
    }
    public static void main(String[] args) {
        // 创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());
        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());
        // 初始化事务
        producer.initTransactions();
        // 订阅主题
        consumer.subscribe(Arrays.asList("consumer-tran"));
        for(;;){
            // 开启事务
            producer.beginTransaction();
            // 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);
            // 处理逻辑
            try {
                Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
                for(ConsumerRecord record : records){
                    // 处理消息
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                    // 记录提交的偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));
                    // 产生新消息
                    Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<>("consumer-send",record.value()+"send"));
                }
                // 提交偏移量
                producer.sendOffsetsToTransaction(commits,"group0323");
                // 事务提交
                producer.commitTransaction();

            }catch (Exception e){
                e.printStackTrace();
                producer.abortTransaction();
            }
        }
    }
}

5.5 事务工作原理

img

5.5.1 事务协调器和事务日志

在Kafka 0.11.0中与事务API一起引入的组件是上图右侧的事务Coordinator和事务日志。

事务Coordinator是每个KafkaBroker内部运行的一个模块。事务日志是一个内部的Kafka Topic。每个Coordinator拥有事务日志所在分区的子集,即, 这些borker中的分区都是Leader。

每个transactional.id都通过一个简单的哈希函数映射到事务日志的特定分区,事务日志文件__transaction_state-0。这意味着只有一个Broker拥有给定的transactional.id。

通过这种方式,我们利用Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可用,并且所有事务状态都能够持久存储

值得注意的是,事务日志只保存事务的最新状态而不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”,“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务日志中。

5.5.2 事务数据流

数据流在抽象层面上有四种不同的类型。

A. producer和事务coordinator的交互
  执行事务时,Producer向事务协调员发出如下请求:

  1. initTransactions API向coordinator注册一个transactional.id。 此时,coordinator使用该transactional.id关闭所有待处理的事务,并且会避免遇到僵尸实例,由具有相同的transactional.id的Producer的另一个实例启动的任何事务将被关闭和隔离。每个Producer会话只发生一次。
  2. 当Producer在事务中第一次将数据发送到分区时,首先向coordinator注册分区。
  3. 当应用程序调用commitTransaction或abortTransaction时,会向coordinator发送一个请求以开始两阶段提交协议。

B. Coordinator和事务日志交互
  随着事务的进行,Producer发送上面的请求来更新Coordinator上事务的状态。事务Coordinator会在内存中保存每个事务的状态,并且把这个状态写到事务日志中(这是以三种方式复制的,因此是持久保存的)。

事务Coordinator是读写事务日志的唯一组件。如果一个给定的Borker故障了,一个新的Coordinator会被选为新的事务日志的Leader,这个事务日志分割了这个失效的代理,它从传入的分区中读取消息并在内存中重建状态。

C.Producer将数据写入目标Topic所在分区
  在Coordinator的事务中注册新的分区后,Producer将数据正常地发送到真实数据所在分区。这与producer.send流程完全相同,但有一些额外的验证,以确保Producer不被隔离。

D.Topic分区和Coordinator的交互

  1. 在Producer发起提交(或中止)之后,协调器开始两阶段提交协议。
  2. 在第一阶段,Coordinator将其内部状态更新为“prepare_commit”并在事务日志中更新此状态。一旦完成了这个事务,无论发生什么事,都能保证事务完成。
  3. Coordinator然后开始阶段2,在那里它将事务提交标记写入作为事务一部分的Topic分区。
  4. 这些事务标记不会暴露给应用程序,但是在read_committed模式下被Consumer使用来过滤掉被中止事务的消息,并且不返回属于开放事务的消息(即那些在日志中但没有事务标记与他们相关联)。
  5. 一旦标记被写入,事务协调器将事务标记为“完成”,并且Producer可以开始下一个事务。

5.6 事务相关配置

5.6.1 Broker configs

(1) transactional.id.timeout.ms:

在ms中,事务协调器在生产者TransactionalId提前过期之前等待的最长时间,并且没有从该生产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周一次的生产者作业维护它们的id

(2) max.transaction.timeout.ms

事务允许的最大超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户机超时过大,从而导致用户无法从事务中包含的主题读取内容。

默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。

(3) transaction.state.log.replication.factor

事务状态topic的副本数量。默认值:3

(4) transaction.state.log.num.partitions

事务状态主题的分区数。默认值:50

(5) transaction.state.log.min.isr

事务状态主题的每个分区ISR最小数量。默认值:2

(6) transaction.state.log.segment.bytes
事务状态主题的segment大小。默认值:104857600字节

5.6.2 Producer configs

  1. enable.idempotence:开启幂等

  2. transaction.timeout.ms:事务超时时间

事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间。

这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout。在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。

默认是60000。这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。

  1. transactional.id

用于事务性交付的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等交付。

5.6.3 Consumer configs

  1. isolation.level
  • read_uncommitted:以偏移顺序使用已提交和未提交的消息。
  • read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。

5.7 事务性能以及如何优化

5.7.1 Producer打开事务之后的性能

让我们把注意力转向事务如何执行。首先,事务只造成中等的写入放大。

额外的写入在于:

  1. 对于每个事务,我们都有额外的RPC向Coordinator注册分区。
  2. 在完成事务时,必须将一个事务标记写入参与事务的每个分区。同样,事务Coordinator在单个RPC中批量绑定到同一个Borker的所有标记,所以我们在那里保存RPC开销。但是在事务中对每个分区进行额外的写操作是无法避免的。
  3. 最后,我们将状态更改写入事务日志。这包括写入添加到事务的每批分区,“prepare_commit”状态和“complete_commit”状态。

我们可以看到,开销与作为事务一部分写入的消息数量无关。所以拥有更高吞吐量的关键是每个事务包含更多的消息。

实际上,对于Producer以最大吞吐量生产1KB记录,每100ms提交消息导致吞吐量仅降低3%。较小的消息或较短的事务提交间隔会导致更严重的降级。

增加事务时间的主要折衷是增加了端到端延迟。回想一下,Consum阅读事务消息不会传递属于公开传输的消息。因此,提交之间的时间间隔越长,消耗的应用程序就越需要等待,从而增加了端到端的延迟。

5.7.2 Consumer打开之后的性能

Consumer在开启事务的场景比Producer简单得多,它需要做的是:

  • 过滤掉属于中止事务的消息。
  • 不返回属于公开事务一部分的事务消息。

因此,当以read_committed模式读取事务消息时,事务Consumer的吞吐量没有降低。这样做的主要原因是我们在读取事务消息时保持零拷贝读取。

此外,Consumer不需要任何缓冲等待事务完成。相反,Broker不允许提前抵消包括公开事务。

因此,Consumer是非常轻巧和高效的。感兴趣的读者可以在本文档(链接2)中了解Consumer设计的细节。

六、SpringBoot集成kafka

6.1 配置Maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

6.2 项目具体代码

6.2.1 yml配置
spring:

    kafka:
        # kafka服务器地址(可以多个)
        bootstrap-servers: localhost:9092
        producer:
            # key/value的序列化
            key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            # 返回数据形式
            # acks: all
            # 批量抓取
            batch-size: 65536
            # 缓存容量
            buffer-memory: 524288
            # 服务器地址
            bootstrap-servers: localhost:9092
        consumer:
            # key/value的反序列化
            key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            # 指定一个默认的组名
            group-id: kafka2
            # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
            auto-offset-reset: earliest
6.2.2 生产者
  1. 步骤
    a.创建一个生产者对象kafkaProducer
    b.调用send反射消息(ProducerRecor,封装是key-value键值对)
    c.调用Future.get()表示获取服务器的响应
    d.关闭生产者
  2. 代码
package com.kafka.demo.controller;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @RequestMapping("send/sync/{massage}")
    public String send(@PathVariable String massage) {
        final ListenableFuture<SendResult<Integer, String>> future = this.template.send("test1", 0, 0, massage);

        try {
            final SendResult<Integer, String> sendResult = future.get();
            final RecordMetadata metadata = sendResult.getRecordMetadata();

            System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }
}


6.2.3 消费者
package com.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test1")
    public void onMassage(ConsumerRecord<Integer, String> record) {
        System.out.println("收到的消息"
                + "\t" + record.topic()
                + "\t" + record.partition()
                + "\t" + record.offset()
                + "\t" + record.key()
                + "\t" + record.value());
    }
}


6.2.4
package com.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐