一、kafka API简介

kafka包含5个核心api:

  1. Producer API用来让应用程序发送流数据到kafka集群中的主题。
  2. Consumer API用来让应用程序读取kafka集群主题中的流数据。
  3. Streams API 用来从输入主题到输出主题转换流数据。
  4. Connect API 用来实现连接器,持续的从数据源或应用程序推送数据到kafka,或从kafka中拉取数据。
  5. AdminClient API用来管理和检查主题、broker和其他kafka对象。

Kafka 通过独立于语言的协议公开其所有功能,该协议具有多种编程语言的客户端。 然而,只有 Java 客户端作为主要 Kafka 项目的一部分进行维护,其他客户端可作为独立的开源项目使用。

二、引入kafka java客户端依赖

要使用Producer、Consumer和AdminClient,你可以使用以下 maven 依赖项:

        <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.1.0</version>
		</dependency>

而Streams需引入以下依赖:

        <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
			<version>2.1.0</version>
		</dependency>

三、发送消息

kafka封装了一套二进制通信协议,用于对外提供各种各样的服务,对于producer而言,用户可以使用任意编程语言按照该协议的格式进行编程,从而实现向kafka发送消息。这组协议本质上为不同的协议类型分别定义了专属的紧凑二进制字节数组格式,然后通过socket发送给合适的broker,之后等待broker处理完成后返还响应给producer。

每个producer都是独立进行工作的,与其他producer之间没有关联,目前producer的首要功能就是向某个topic的某个分区发送一条消息,所以它首先需要确认到底向topic的哪个分区写入消息,这就是分区器(partitioner)的事情。kafka producer提供了一个默认的分区器,对于每条待发送的消息,如果该消息指定了key,那么该partitioner会根据key的哈希值来选择目标分区,若没有,会使用轮询的方式确认目标分区。当然,producer的API赋予了用户自行指定目标分区的权力。

在确认了目标分区后,producer要做的第二件事就是要寻找这个分区对应的leader,只有leader才能响应客户端发送过来的请求,而剩下的从节点中有一部分会同步该消息。因此在发送消息时,producer有不等待任何副本的响应便返回成功,或者只等待leader响应写入操作之后再返回成功。

Java版本producer工作流程:
在这里插入图片描述
代码演示:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        // 必须
        props.put("bootstrap.servers","121.5.240.148:9092");
        // 被发送到broker的任何消息的格式都必须是字节数组
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        // 非必须参数配置
        // acks=0表明producer完全不管发送结果;
        // acks=all或-1表明producer会等待ISR所有节点均写入后的响应结果;
        // acks=1,表明producer会等待leader写入后的响应结果
        props.put("acks","-1");
        // 发生可重试异常时的重试次数
        props.put("retries",3);
         // producer会将发往同一分区的多条消息封装进一个batch中,
        // 当batch满了的时候,发送其中的所有消息,不过并不总是等待batch满了才发送消息;
        props.put("batch.size",323840);
         // 控制消息发送延时,默认为0,即立即发送,无需关心batch是否已被填满。
        props.put("linger.ms",10);
        // 指定了producer用于缓存消息的缓冲区大小,单位字节,默认32MB
        // producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区中读取消息执行真正的发送
        props.put("buffer.memory",33554432);
        // 设置producer能发送的最大消息大小
        props.put("max.request.size",10485760);
        // 设置是否压缩消息,默认none
        props.put("compression.type","lz4");
        // 设置消息发送后,等待响应的最大时间
        props.put("request.timeout.ms",30);

        Producer<String,String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0;i<5;i++){
            producer.send(new ProducerRecord<>("my-replicated-topic","key"+i,"value"+i));
        }

        producer.close();

    }
}

kafka producer发送消息的主方法是send方法,在底层完全地实现了异步化发送,并且通过Java提供的Future同时实现了同步发送和异步发送+回调两种发送方式。而上述代码使用的是第三种方式,即发送之后便不再理会发送结果,这种方式在实际中是不被推荐使用的。

如果发送时连接不上,需要修改kafka配置文件重启,如下:
在这里插入图片描述

异步发送:
实际上所有的写入操作默认都是异步的,send方法会返回一个Java Future对象供用户稍后获取发送结果,这就是所谓的回调机制。具体代码如下:

producer.send(record,new Callback(){
  @Override
  public void onCompletion(RecordMetadata metadata,Exception exception){
    if(exception == null){
      System.out.println("消息发送成功");
    } else {
      System.out.println("消息发送失败");
    }    
  }
});  

上面的代码中,Callback就是发送消息后的回调类,其onCompletion方法的两个输入参数metadata和exception不会同时非空,当消息发送成功时,exception为null,当消息发送失败时,metadata就是null。Callback实际上是一个Java接口,因此可创建自定义的Callback实现类来处理消息发送后的逻辑。

同步发送:
同步发送和异步发送其实就是通过Java的Future来区分的,调用Future.get()等待返回结果,即是同步发送,具体代码如下:

ProducerRecord<String, String> record = new ProducerRecord<>("my-replicated-topic","key"+i,"value"+i);
 RecordMetadata recordMetadata = producer.send(record).get();

使用Future.get()方法会一直等待下去直到broker将结果返回给producer程序,当结果返回时,get()方法要么返回发送结果,要么抛出异常交由producer自行处理。如果没有错误,get将返回对应的RecordMetadata实例。

kafka发送异常
当前kafka的错误类型包含了两类:可重试异常和不可重试异常,常见的可重试异常如下:

  • LeaderNotAvailableException:分区对应的leader不可用,通常出现在leader换届选举时,因此是瞬时的异常,重试之后可自行恢复。
  • NotControllerException:表明controller当前不可用,在经历新一轮的选举,重试之后可自行恢复。
  • NetworkException:网络瞬时异常,可重试。

所有可重试的异常都继承自or.apache.kafka.common.errors.RetriableException,对于这些可重试的异常,如果在producer程序中配置了重试次数,那么只要在规定的重试次数内自行恢复了,便不会出现在onCompletion的exception中。若超过了重试次数仍没成功,就会被封装到exception中,此时就需要producer程序自行处理这种异常。

没有继承自RetriableException的其他异常都属于不可重试异常,这类异常表明了一些非常严重或kafka无法处理的问题。

四、消息分区机制

producer发送过程中需要确定将消息发送到topic的哪一个分区,默认的分区器会尽力确保具有相同key的所有消息都被发送到相同的分区上;若没有指定key,会以轮询的方式来确保消息在topic的分区上均匀分配。

1、自定义分区机制

自定义分区器需要实现org.apache.kafka.clients.producer.Partitioner接口,分区逻辑写在partition()方法中,例如:

public class AuditPartitioner implements Partitioner {

    private Random random;

    @Override
    public void configure(Map<String, ?> map) {
        // 该方法实现必要资源的初始化工作
        random = new Random();
    }

    @Override
    public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String)keyObj;
        // 获取该topic可用的所有分区
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        int partitionCount = partitionInfoList.size();
        int auditPartition = partitionCount -1;

        return key == null || !key.contains("audit") ? random.nextInt(auditPartition) : auditPartition;
    }

    @Override
    public void close() {
         // 该方法实现必要资源的清理
    }

}

使用自定义分区器:

props.put("partitioner.class","xx.xx.AuditPartitioner");

五、消息序列化

网络中发送数据都是以字节的方式,序列化器负责在producer发送消息前将其转换成字节数组,与之相反,解序列化器负责将consumer接收到的字节数组转换成相应的对象数据。

1、自定义序列化器

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,在serializer方法中实现序列化逻辑,例如:

首先定义一个POJO对象

public class User{
  private String firstName;
  private String lastName;
  private int age;
  private String address;

  public User(String firstName,String lastName,int age,String address){
  this.firstName=firstName;
  this.lastName=lastName;
  this.age=age;
  this.address=address;
  }
  
}  

由于要用jackson-mapper-asl包中的ObjectMapper来将对象转成字节数组,因此需要将其依赖引入:

<dependency>
  <groupId>org.codehaus.jackson</groupId>
  <artifactId>jackson-mapper-asl</artifactId>
  <version>1.9.13</version>
</dependency>  
  

接下来创建serializer

public class UserSerializer implements Serializer{
  private ObjectMapper objectMapper;
  
  @Override
  public void configure(Map config,boolean isKey){
      objectMapper = new ObjectMapper();
  }

  @Override
  public byte[] serialize(String topic,Object data){
    byte[] res = null;
    try{
      res = objectMapper.writeValueAsString(data).getBytes("utf-8");
    }catch(Exception e){
      logger.warn("failed to serialize the object: {}", data, e);
    }
    return res;
  }

   @Override
   public void close(){}
}           

使用自定义序列化器:

props.put("value.serializer","xx.xx.UserSerializer");

六、producer拦截器

producer拦截器使得用户在消息发送前和producer回调逻辑执行前可对消息做一些定制化处理,允许使用多个拦截器构成拦截器链。拦截器的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。interceptor可能运行在多个线程中,因此在具体实现时用户需要确保线程安全。下面以一个简单的双interceptor组成的拦截链为例。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部,第二个interceptor会在消息发送后更新成功发送消息或失败发送消息数。

public class TimeStampPrependerInterceptor implements ProducerInterceptor<String,String> {
    /**
     * producer确保在消息被序列化前调用该方法
     * 可以在该方法中对消息做任何操作
     */
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() +","+record.value().toString());
    }

    /**
     * 该方法会在消息被应答之前或消息发送时调用
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
public class CounterInterceptor implements ProducerInterceptor<String,String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            successCounter++;
        }else{
            errorCounter++;
        }
    }

    @Override
    public void close() {
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

使用自定义interceptor:

List<String> interceptors = new ArrayList<>();
interceptors.add("xx.xx.TimeStampPrependerInterceptor");
interceptors.add("xx.xx.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

七、消息的可靠发送

Java版本的producer采用异步发送机制,send方法将消息放入缓冲区,由一个专属I/O线程负责从缓冲区中提取消息并封装进消息batch中,然后发送出去。这个过程存在着数据丢失的窗口,即若I/O线程发送之前producer崩溃,则存储缓冲区中的消息会全部丢失。producer的另一个问题就是消息的乱序,假设现发送record1和record2两条消息,由于某些原因导致record1未发送成功,同时kafka又配置了重试机制,那么producer重试record1成功后,record1在日志中的位置可能反而位于record2之后。

1、无消息丢失配置

// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。默认为60000ms。
max.block.ms=60000

acks=all
retries=Integer.MAX_VALUE

// 该参数设置为1使producer在某个broker发送响应之前将无法再给broker发送请求,可防止topic同分区下的消息乱序问题,
max.in.flight.requests.per.connection=1

// 设置不允许非ISR中的副本被选举为leader,从而避免broker端因日志问题造成消息的丢失
unclean.leader.electionenable=false

replication.factor=3
// 用于控制某条消息至少被写入到ISR中的多少个副本才算成功
min.insync.replicas=2

enable.auto.commit=false

使用带回调机制的send发送消息
Callback逻辑中显式立即关闭producer

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐