新建maven项目,引入依赖:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.2.1</version>
</dependency>

java kafka生产者

public class JavaProducer {
    
    // kafka地址
    public static final String bootstrapServer = "localhost:9092";
    // topic主题
    public static final String topic = "test";
    
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        // 指定key和消息体value的编码方式
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers",bootstrapServer);
        
        // 创建并配置生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 创建消息,并指定分区
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,"test message 033");
        // 发送消息
        kafkaProducer.send(producerRecord);
        // 关闭生产者客户端
        kafkaProducer.close();
        
        
    }
    
}

一个消息的生产主要包括四个步骤:

1.配置和创建生产者实例

2.配置和创建消息

3.发送消息

4.关闭生产者客户端实例

Kafka生产者配置项

kafka生产者有三个必填配置

  • bootstrap.servers:指定broker地址清单
  • key.serializer:key的序列化方式,消费者对应的需要配置反序列化方式
  • value.serializer:value的序列化方式,消费者对应的需要配置反序列化方式

还有一些非必填配置,参照org.apache.kafka.clients.producer.ProducerConfig类。

消息的创建

消息主要包括以下属性,其中topic和value是必填项,其余是选填项

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
    ………
}

对应的,ProducerRecord也提供了多个构造方法

img

消息的发送

生产者实例和消息实例都构建完成之后,就可以发送了

发送消息主要由三种模式:发后即忘(fire-and-forget)、同步(sync)、异步(async)

上面kafkaProducer.send(producerRecord);就是发后即忘,他只管发送消息,至于有没有发送成功不关心,这就有消息丢失的可能。

事实上send方法并非是void类型的,而是Future<RecordMetadata>类型,并且提供了两个重载方法

img

所以同步发送模式就可以利用返回的Future对象实现:

Future<RecordMetadata> send = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = send.get();

异步发送方式则可以利用send的重载方法,指定一个callback回调函数

kafkaProducer.send(producerRecord, (metadata, exception) -> {
    if (exception != null) {
        // 异常处理
    } else {
        System.out.println(metadata);
    }
});
RecordMetadata

RecordMetadata对象包含了消息的一些元数据信息:

public final class RecordMetadata {

    /**
     * Partition value for record without partition assigned
     */
    public static final int UNKNOWN_PARTITION = -1;

    private final long offset;
    // The timestamp of the message.
    // If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
    // If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
    // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
    // producer.
    private final long timestamp;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final TopicPartition topicPartition;
    
    //…………
}
关闭生产者客户端示例

通常情况下,一个KafkaProducer不会只发送单条消息。在发送完所有的消息后,调用KafkaProducer#close()方法关闭KafkaProducer实例来回收资源。close()方法会阻塞等待所有发送请求完成后再关闭KafkaProducer

KafkaProducer还提供了一个带超时时间的重载方法,如果使用这个重载方法,则只会等待指定的超时时间,如果超过了这个时间,即使还有消息未发送完成,也会强行退出。我们一般使用无参的close()方法。

java kafka消费者

public class JavaConsumer {

    public static final String bootstrapServer = "localhost:9092";
    public static final String topic = "test";
    public static final String group_id = "test-group2";

    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers",bootstrapServer);
        properties.put("group.id",group_id);
        // 创建消费者客户端
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅主题
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        // 循环消费消息
        try{
            while (isRunning.get()){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    // 处理consumerRecord
                }
            }
        }catch (Exception e){
            // 处理异常
        }finally {
            kafkaConsumer.close();
        }
        
    }
}

消费者的消费逻辑主要包括以下几个步骤:

1.配置消费者客户端参数并创建消费者实例

2.订阅主题

3.拉取消息并消费

4.提交消费位移

5.关闭消费者实例

配置消费者客户端

必填参数:

  • bootstrap.servers:指定broker地址清单
  • key.deserialize:key反序列化方式,与生产者序列化方式对应
  • value.deserializer:value反序列化方式,与生产者序列化方式对应
  • group.id:消费者组名

更多配置参org.apache.kafka.clients.consumer.ConsumerConfig

订阅主题与分区

调用KafkaConsumer#subscribe()方法来订阅主题。

一个消费者可以订阅一个或多个主题。

KafkaConsumer#subscribe()有四个重载方法,可以以集合的方式或者正则表达式的方式来订阅主题。

img

kafkaConsumer.subscribe(Arrays.asList("topic1"));
kafkaConsumer.subscribe(Pattern.compile("topic*"));

消费者还可以调用KafkaConsumer#assign(Collection<TopicPartition> partitions)方法来直接订阅某些主题的特定分区

取消订阅则使用KafkaConsumer#unsubscribe()方法

消息的消费

kafka消费消息是一个不断轮询的过程,在上面的代码中可以看出,消费者消费消息就是重复的调用poll()方法,poll()方法返回的则是所订阅的主题(分区)上的一组消息。

    @Override
    public ConsumerRecords<K, V> poll(final Duration timeout) {
        return poll(time.timer(timeout), true);
    }
poll()`方法接收一个超时时间参数`timeout`,在消费者的缓冲区里没有可用数据时会发生阻塞,阻塞时间为`timeout
ConsumerRecord

消费者消费到的消息类型为ConsumerRecord,相对于ProducerRecord,ConsumerRecord的内容更加丰富一些

public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;

    /**
     * @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
     *             (deprecated since 3.0).
     */
    @Deprecated
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Optional<Integer> leaderEpoch;
    
    //………………
}
位移提交
控制或关闭消费

KafkaConsumer#pause():暂停某些分区在拉取操作时返回数据给客户端

KafkaConsumer#resume():恢复某些分区向客户端返回数据

KafkaConsumer#paused():返回被暂停的分区集合

KafkaConsumer#close():关闭消费者

img

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐