2.3 SpringKafka消费者

2.3.1 Kafka消息监听器MessageListener

之前已经介绍了通过kafka-topic-consumer.shkafka-tool工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。

Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListenerSpringKafka的消息监听器接口,也是一个函数式接口,利用接口的onMessage方法可以实现消费数据。

public interface GenericMessageListener<T> {
	void onMessage(T data);
    
	default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
		throw new UnsupportedOperationException("Container should never call this");
	}

	default void onMessage(T data, Consumer<?, ?> consumer) {
		throw new UnsupportedOperationException("Container should never call this");
	}

	default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
		throw new UnsupportedOperationException("Container should never call this");
	}

}

基于此接口可以实现单条数据消息监听器接口MessageListenen、多条数据消息监听器接口BatchMessageListener、带ACK机制的消息监听器AcknowledgingMessageListenerBatchAcknowledgingMessageListener

MessageListener
GenericMessageListener
单条数据监听器
BatchMessageListener
批量数据监听器
AckowledgingMessageListenenr
带ACK的单条数据监听器
BatchAckowledgingMessageListener
带ACK机制的批量数据监听器

2.3.2 消息监听容器与容器工厂

消息监听器MessageListener是由消费监听器容器MessageListenerContainer接口来承载,使用setupMessageListenner()方法启动一个监听器。其中还有定义了操作消息的resume()pause()等方法。

public interface MessageListenerContainer extends SmartLifecycle {
    // 启动一个消息监听器
	void setupMessageListener(Object messageListener);
    // 获取消费者的指标信息
    Map<String, Map<MetricName, ? extends Metric>> metrics();
}

spring-kafka提供了两个容器KafkaMessageListenerContainerConcurrentMessageListenerContainer

GenericMessageListenerContainer
MessageListenerContainer
AbstractMessageListenerContainer
KafkaMessageListenerConatiner
ConcurrentMessageListenerCOntainer

消息监听器容器由容器工厂KafkaListenerContainerFactory统一创建并管理

public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
    // 根据endpoint创建监听器容器
    C createListenerContainer(KafkaListenerEndpoint endpoint);
    // 根据topic、partition和offset的配置
	C createContainer(TopicPartitionOffset... topicPartitions);
	// 根据topic创建监听器容器
    C createContainer(String... topics);
    // 根据topic的正则表达式创建监听器容器
	C createContainer(Pattern topicPattern);
}

spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory,其有两个重要的配置

ContainerPropertiesConsumerFactory

AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory

ContainerProperties定义了要消费消息的topic,消息处理的MessageListener等信息。

因此要实现一个消息监听器的流程如下:

KafkaProperties
ConsumerFactory
ContainerProperties
ContainerFactory
ListenerContainer
MessageListener

2.3.3 非注解式消费监听器

SpringKafka的消费者是由一个消费监听器容器ListenerConatiner去承载的,容器对应一个配置文件为ContainerPropertiesContainerProperties继承自消费者配置类ConsumerProperties,并且承载了消息监听器的设置

ContainerProperties
ConsumerProperties

首先介绍非注解式的消息监听器,类似于ProducerFactory,消费者需要创建一个ConsumerFactory

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}

然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

有了容器工厂之后,就可以通过注册bean的方式生成一个MessageListenerContainer

@Bean
public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(
    ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProperties = new ContainerProperties("numb");
    containerProperties.setMessageListener(
        (MessageListener<String, String>) data -> System.out.println("收到消息: " + data.value()));
    return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
}

在这个kafkaMessageListenerContainer中,通过ContainerProperties配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer注册到ConcurrentKafkaListenerContainerFactory中,这样获取到数据后会自动调用消息监听器进行数据处理。

测试消费者消费数据

@Test
public void test_send_and_consume() {
    ExecutorService threadPool = Executors.newCachedThreadPool();
    threadPool.submit(() -> {
        while (true) {
            kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("发送完成");
        }
    });
    while (true);
}

输出:

发送完成
发送成功
收到消息: kv
发送完成
发送成功
收到消息: kv

2.3.4 注解式消费监听器@KafkaListener

之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory之后,还需要用代码配置MessageListenerContainer, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener实现。

@Component
@Slf4j
public class MessageHandler {
    @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
        // , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})}
        )
    public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            String message = (String) record.value();
            log.info("收到消息: {}", message);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            // 手动提交 offset
            acknowledgment.acknowledge();
        }
    }
}
@KafkaListener的主要属性
  • id:监听器的id

  • groupId:消费组id

  • idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId

  • topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

  • topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一)

  • topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
  • containerFactory:指定监听器容器工厂

  • errorHandler: 监听异常处理器,配置BeanName

  • beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”

  • clientIdPrefix:消费者Id前缀

  • concurrency: 覆盖容器工厂containerFactory的并发配置

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐