Spring-Kafka系列(3)—— SpringKafka消费者监听MessageListener
之前已经介绍了通过和工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。是的消息监听器接口,也是一个函数式接口,利用接口的方法可以实现消费数据。基于此接口可以实现单条数据消息监听器接口、多条数据消息监听器接口、带ACK机制的消息监听器和MessageListenerGenericMessageL
2.3 SpringKafka消费者
2.3 SpringKafka消费者
2.3.1 Kafka消息监听器MessageListener
之前已经介绍了通过kafka-topic-consumer.sh
和kafka-tool
工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。
Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener
是SpringKafka
的消息监听器接口,也是一个函数式接口,利用接口的onMessage
方法可以实现消费数据。
public interface GenericMessageListener<T> {
void onMessage(T data);
default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
}
基于此接口可以实现单条数据消息监听器接口MessageListenen
、多条数据消息监听器接口BatchMessageListener
、带ACK机制的消息监听器AcknowledgingMessageListener
和BatchAcknowledgingMessageListener
2.3.2 消息监听容器与容器工厂
消息监听器MessageListener
是由消费监听器容器MessageListenerContainer
接口来承载,使用setupMessageListenner()
方法启动一个监听器。其中还有定义了操作消息的resume()
、pause()
等方法。
public interface MessageListenerContainer extends SmartLifecycle {
// 启动一个消息监听器
void setupMessageListener(Object messageListener);
// 获取消费者的指标信息
Map<String, Map<MetricName, ? extends Metric>> metrics();
}
spring-kafka提供了两个容器KafkaMessageListenerContainer
和ConcurrentMessageListenerContainer
。
消息监听器容器由容器工厂KafkaListenerContainerFactory
统一创建并管理
public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
// 根据endpoint创建监听器容器
C createListenerContainer(KafkaListenerEndpoint endpoint);
// 根据topic、partition和offset的配置
C createContainer(TopicPartitionOffset... topicPartitions);
// 根据topic创建监听器容器
C createContainer(String... topics);
// 根据topic的正则表达式创建监听器容器
C createContainer(Pattern topicPattern);
}
spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory
,其有两个重要的配置
ContainerProperties
和ConsumerFactory
ContainerProperties
定义了要消费消息的topic
,消息处理的MessageListener
等信息。
因此要实现一个消息监听器的流程如下:
2.3.3 非注解式消费监听器
SpringKafka的消费者是由一个消费监听器容器ListenerConatiner
去承载的,容器对应一个配置文件为ContainerProperties
,ContainerProperties
继承自消费者配置类ConsumerProperties
,并且承载了消息监听器的设置
首先介绍非注解式的消息监听器,类似于ProducerFactory
,消费者需要创建一个ConsumerFactory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
有了容器工厂之后,就可以通过注册bean
的方式生成一个MessageListenerContainer
@Bean
public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(
ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProperties = new ContainerProperties("numb");
containerProperties.setMessageListener(
(MessageListener<String, String>) data -> System.out.println("收到消息: " + data.value()));
return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
}
在这个kafkaMessageListenerContainer
中,通过ContainerProperties
配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer
注册到ConcurrentKafkaListenerContainerFactory
中,这样获取到数据后会自动调用消息监听器进行数据处理。
测试消费者消费数据
@Test
public void test_send_and_consume() {
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.submit(() -> {
while (true) {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送完成");
}
});
while (true);
}
输出:
发送完成
发送成功
收到消息: kv
发送完成
发送成功
收到消息: kv
2.3.4 注解式消费监听器@KafkaListener
之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory
之后,还需要用代码配置MessageListenerContainer
, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener
实现。
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
// , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})}
)
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}
@KafkaListener的主要属性
-
id:监听器的id
-
groupId:消费组id
-
idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId
-
topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
-
topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一)
-
topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
-
containerFactory:指定监听器容器工厂
-
errorHandler: 监听异常处理器,配置BeanName
-
beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”
-
clientIdPrefix:消费者Id前缀
-
concurrency: 覆盖容器工厂containerFactory的并发配置
更多推荐
所有评论(0)