Spring-Kafka消费者端源码分析
本文按照源码分析三部曲:核心类->核心流程->核心逻辑,对spring-kafka消费端的源码实现进行分析。
核心类
MessageListener
consumer拉到消息后的监听,spring-kafka提供给使用方定义消费业务逻辑的顶层接口,业务方通过实现接口接收消息进行业务逻辑处理。
依据需要监听的参数不同,提供了3个子接口,分别为:
- AcknowledgingMessageListener:能够拿到Acknowledgment参数,Acknowledgment用于对拿到的消息记录进行ack处理。
- ConsumerAwareMessageListener:可以拿到原生的Consumer参数。
- AcknowledgingConsumerAwareMessageListener:即能拿到Consumer参数,也能拿到Acknowledgment参数。
其中AcknowledgingConsumerAwareMessageListener,默认提供了几个实现类,例如RecordMessagingMessageListenerAdapter:主要提供了原生消息ConsumerRecord到Message的转换。
对应的MessageListener还有用于批量监听消息的BatchMessageListener,继承结构与MessageListener类似
KafkaListenerAnnotationBeanPostProcessor
Spring Bean后置处理器,用于扫描bean中的@KafkaListener注解,并封装成为MethodKafkaListenerEndpoint,在bean实例化后处理。
KafkaListenerEndpoint
顶层接口,具体实现为MethodKafkaListenerEndpoint,一个@KafkaListener标注方法的方法会封装成为一个MethodKafkaListenerEndpoint。
KafkaListenerEndpointDescriptor
KafkaListenerEndpoint的内部类,KafkaListenerEndpoint的进一步包装,主要增加了KafkaListenerContainerFactory
KafkaListenerEndpointRegistrar
Helper Bean,注册容器中所有的KafkaListenerEndpoint(具体逻辑委派给了KafkaListenerEndpointRegistry)。核心方法:registerAllEndpoints,KafkaListenerEndpointRegistrar在初始化时调用。
KafkaListenerEndpointRegistry
将KafkaListenerEndpoint注册成为MessageListenerContainer,核心方法:KafkaListenerEndpointRegistry#registerListenerContainer。
MessageListenerContainer
KafkaListenerEndpoint最终会封装成为MessageListenerContainer。关键实现类:KafkaMessageListenerContainer(单线程消费)、ConcurrentMessageListenerContainer(多线程消费,线程数由concurrency指定)
其中ConcurrentMessageListenerContainer根据consumer配置的concurrent数值决定了持有多少个KafkaMessageListenerContainer。
KafkaMessageListenerContainer持有内部类ListenerConsumer,本质上是一个Runnable类。
KafkaListenerContainerFactory
创建MessageListenerContainer的工厂,关键接口方法:KafkaListenerContainerFactory#createListenerContainer。
ErrorHandler
用于捕获执行MessageListener后发生的业务异常(即业务方实现MessageListener处理时报错后执行的异常处理器)
根据异常发生后,回调异常处理器回调参数的不同,分为几个子接口:
- ConsumerAwareErrorHandler:可以拿到consumer对象
- RemainingRecordsErrorHandler:可以拿到当前失败消息记录之后的消息记录。
- ContainerAwareErrorHandler:可以拿到MessageListenerContainer对象。
默认提供了3个实现类:
- ContainerStoppingErrorHandler:当消费消息异常后,直接结束掉当前监听器容器。
- SeekToCurrentErrorHandler:移动offset到当前失败的消息记录offset。
- ConditionalDelegatingErrorHandler:提供了不同异常类型触发不同类型异常处理器的选择逻辑。
KafkaListenerErrorHandler
@KafkaListener注解可以指定KafkaListenerErrorHandler,当标注了
@KafkaListener的方法发生异常时,会执行KafkaListenerErrorHandler,如果没有提供KafkaListenerErrorHandler,则异常会上抛至ErrorHandler异常处理器处理。
spring-kafka未提供KafkaListenerErrorHandler的具体实现类
核心流程
@KafkaListener注册流程
两个关键时机:
-
时机一:当每一个bean完成初始化后。由实现了BeanPostProcessor触发postProcessAfterInitialization。
-
时机二:当所有单例bean实例化后触发。由SmartInitializingSingleton的afterSingletonsInstantiated方法触发。
关键逻辑:
- 扫描@KafkaListener注解,封装成封装成KafkaListenerEndpoint。
- 依赖KafkaListenerContainerFactory具体工厂创建具体的MessageListenerContainer实现类。
- MessageListenerContainer会设置用于监听消息进行逻辑处理的具体MessageListener(BatchMessageListener)子类实现。
consumer启动流程
KafkaListenerEndpointRegistry实现了Spring的LifeCycle接口,bean实例化后会由spring容器促发start()方法。
入口:org.springframework.kafka.config.KafkaListenerEndpointRegistry#start
关键点:
- consumer的启动是通过Spring的refresh过程中处理的实现了Lifecycle的bean进行触发。
- consumer真正的启动发生在ListenerConsumer这个任务提交到异步线程池SimpleAsyncTaskExecutor。
- 每个KafkaListenerContainer持有一个SimpleAsyncTaskExecutor,SimpleAsyncTaskExecutor只接收一个任务,等同于ListenerConsumer被一个线程一直执行。
- ConcurrentMessageListenerContainer持有了concurrency个KafkaListenerContainer。
poll消息消费流程
入口:KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke
关键点:
- poll消息过程由一个线程一直循环调用kafka的poll方法拉消息,线程任务类ListenerConsumer封装了整个消息拉取后如何回掉MessageListener的逻辑。
- 拉到的消息会按照对应业务方定义的是否是批量监听还是单条消息监听,来回调。
- 消息的offset如何ack,由不同配置来决定如何提交。
核心源码
消费端比较核心的逻辑是offset的处理:offset的提交时机,以及消息监听器处理发生异常后应该如何。
因此本部分内容重点围绕消息拉取过程中offset的处理进行分析。
这里从消息poll流程展开分析。
分析入口:KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke
主干源码
//代码有简化
protected void pollAndInvoke() {
if (!this.autoCommit && !this.isRecordAck) {
//如果配置的是不自动提交,且ackMode不是record,则进行未提交的offset处理
processCommits();
}
//poll消息速率控制:是否每次拉取消息都wait一下
idleBetweenPollIfNecessary();
if (this.seeks.size() > 0) {
processSeeks();
}
this.lastPoll = System.currentTimeMillis();
if (!isRunning()) {
return;
}
//拉取一批消息
ConsumerRecords<K, V> records = doPoll();
//拉到消息,进入消息处理逻辑
if (records != null && records.count() > 0) {
savePositionsIfNeeded(records);
//根据配置的idleInterval,是否发送ListenerContainerNoLongerIdleEvent事件
notIdle();
//回调MessageListener
invokeListener(records);
}
else {
//根据配置的idleInterval,是否发送ListenerContainerIdleEvent事件
checkIdle();
}
}
整体逻辑:
- 根据ack的配置决定是否处理未提交的offset
- 根据idleBetweenPolls配置,是否需要将当前线程wait一会,用于控制消息pool的速率。
- 执行 doPoll()拉取一批消息。
- 拉到的消息执行invokeListener回掉业务实现的MessageListener,执行业务的消费逻辑。
offset处理逻辑
Spring基于kafka offset不自动提交的配置下,存在6种Ack Mode(ContainerProperties.AckMode枚举定义),分别如下:
- RECORD:消费完一条记录后就提交。
- BATCH:每次poll一批消息前会把上一批poll的消息的offset提交
- TIME:同BATCH,但需要满足距离上一次提交的时间差是否超过了配置的ackTime间隔时间才会提交。
- COUNT:同BATCH,需要满足未提交的数量超过了配置的ackCount
- COUNT_TIME:TIME或者COUNT任意一种情况满足即可。
- MANUAL:同BATCH,但需要手动调用Acknowledgment.acknowledge()。
- MANUAL_IMMEDIATE:需要手动调用Acknowledgment.acknowledge(),调用后,立即提交当前消息的offset。
spring-kafka对offset的处理主要用了两个变量acks队列和offsets map,以及3个关键方法实现。
两个变量
//没有ack的消息记录
private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();
//没有ack的offset。
//<topic,<partition,offset>>
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
- 如果消费完的消息不需要立即提交,则会将这些消息先缓存到acks队列中。
- acks只是保存了未提交的消息记录,提交前会根据acks中的消息记录封装成对应的offset信息缓存到map中。map结构为:<topic,<partition,offset>>
三个方法
方法一:ListenerConsumer#processCommits
调用时机:消费线程执行的第一个事情就是处理未提交的offset
//如果不是自动提交,并且不是record ack模式,则处理offset
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
private void processCommits() {
//当前未提交的消息记录数量
this.count += this.acks.size();
//从acks队列中poll出未提交的消息记录,如果是MANUAL_IMMEDIATE则立即提交,否则放到offsets map中
handleAcks();
//获取ackMode配置
AckMode ackMode = this.containerProperties.getAckMode();
if (!this.isManualImmediateAck) {
if (!this.isManualAck) {
updatePendingOffsets();
}
//配置为COUNT模式下,是否超过了配置的数量门槛
boolean countExceeded = this.isCountAck && this.count >= this.containerProperties.getAckCount();
//不是TIME、COUNT ACK模式,或者是COUNT模式达到了提交的条件
if ((!this.isTimeOnlyAck && !this.isCountAck) || countExceeded) {
if (this.isCountAck) {
this.logger.debug(() -> "Committing in " + ackMode.name() + " because count "
+ this.count
+ " exceeds configured limit of " + this.containerProperties.getAckCount());
}
//根据offsets,执行offset提交
commitIfNecessary();
this.count = 0;
}
else {
//TIME模式下的提交逻辑
timedAcks(ackMode);
}
}
}
private void handleAcks() {
//while循环poll出所有未提交的ConsumerRecord
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
traceAck(record);
//如果是MANUAL_IMMEDIATE则立即提交,否则放到offsets map中
processAck(record);
record = this.acks.poll();
}
}
private void processAck(ConsumerRecord<K, V> record) {
if (!Thread.currentThread().equals(this.consumerThread)) {
this.acks.put(record);
if (this.isManualImmediateAck) {
this.consumer.wakeup();
}
}
else {
if (this.isManualImmediateAck) {
try {
//MANUAL_IMMEDIATE模式下直接提交
ackImmediate(record);
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ignore - not polling
}
}
else {
//添加到offsets map中
addOffset(record);
}
}
}
总结:
- 未提交的消息记录都会存放在ack队列中。
- 从ack队列中取出所有未提交的消息记录,如果是MANUAL_IMMEDIATE则立即提交,否则先放到offsets map中
- 处理不是MANUAL_IMMEDIATE场景下的offset提交逻辑,如果不是Time和COUNT Ack模式,或者是COUNT模式达到了配置的提交数量,都会执行commit逻辑。time模式则走time模式下的offset提交判断逻辑。timedAcks方法。
关于RECORD模式的处理则藏在ackCurrent中。
方法二:ListenerConsumer#ackCurrent
调用时机:调用消费者Listener之后,执行ackCurrent方法。
public void ackCurrent(final ConsumerRecord<K, V> record) {
//RECORD模式的处理
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
//默认producer不存在,producer存在场景是使用了事务消息
if (this.producer == null) {
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
//直接提交
if (this.syncCommits) {
commitSync(offsetsToCommit);
}
else {
commitAsync(offsetsToCommit, 0);
}
}
else {
this.acks.add(record);
}
}
else if (this.producer != null
|| ((!this.isAnyManualAck || this.commitRecovered) && !this.autoCommit)) {
//不是手动提交,则放到ack队列中
this.acks.add(record);
}
if (this.producer != null) {
sendOffsetsToTransaction();
}
}
总结:ackCurrent主要处理RECORD模式以及非手动提交的场景,RECORD模式下直接提交offset,非手动提交的场景,则放到acks队列中,在下一次poll消息前,通过processCommits决定是否批量提交。
方法三:ConsumerAcknowledgment#acknowledge
acknowledge主要是手动ACK模式下,由用户调用了acknowledge方法后才会触发。
acknowledge实际是调用了ListenerConsumer#processAck方法,在ListenerConsumer#processCommits做了分析,如果是MANUAL,则放到ack队列中,如果是MANUAL_IMMEDIATE,直接提交offset。
异常处理逻辑
按照异常往上抛路径,自下而上进行分析
MessagingMessageListenerAdapter#invokeHandler
protected final Object invokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {
try {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()),
new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()), ex);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()), ex);
}
catch (Exception ex) {
throw new ListenerExecutionFailedException("Listener method '" +
this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex);
}
}
执行@KafkaListener对应的业务方法逻辑,捕获不同异常,最终都封装成了ListenerExecutionFailedException,并抛给上一层
RecordMessagingMessageListenerAdapter#onMessage
接收到上一层抛出的ListenerExecutionFailedException后,catch住,如果有指定具体的errorHandler,则执行由errorHandler的handleError方法,如果没有,则继续向上抛异常。
这里的errorHandler是KafkaListenerErrorHandler,在@KafkaListener可以指定具体的errorHandler,因此可以方便针对不同的业务方法指定不同的异常处理器,但Spring-kafka并未提供可以直接使用的具体实现类。
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
if (message.equals(NULL_MESSAGE)) {
message = new GenericMessage<>(record);
}
Object result = this.errorHandler.handleError(message, e, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}
}
ListenerConsumer#doInvokeRecordListener
catch住上一层抛出的异常后,执行errorHandler处理器,同时会根据配置决定如何处理当前失败消息的offset。
相比上一步的KafkaListenerErrorHandler,这里的errorHandler是由KafkaListenerContainerFactory指定,指定具体的ErrorHandler类,spring-kafka提供了几种ErrorHandler的具体实现,前面核心类ErrorHandler已经做了介绍。
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {
try {
invokeOnMessage(record);
}
catch (RuntimeException e) {
//如果配置了,ackOnError,则会将当前失败的消息进行ack。
boolean acked = this.containerProperties.isAckOnError() && !this.autoCommit && this.producer == null;
if (acked) {
ackCurrent(record);
}
//没有异常处理器,直接抛
if (this.errorHandler == null) {
throw e;
}
try {
//执行异常处理器
invokeErrorHandler(record, iterator, e);
//决定当前失败的消息是否提交
if ((!acked && !this.autoCommit && this.errorHandler.isAckAfterHandle())
|| this.producer != null) {
if (this.isManualAck) {
this.commitRecovered = true;
}
ackCurrent(record);
if (this.isManualAck) {
this.commitRecovered = false;
}
}
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
return null;
}
private void invokeErrorHandler(final ConsumerRecord<K, V> record,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
//如果是RemainingRecordsErrorHandler类型的异常处理器
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
if (this.producer == null) {
//处理未提交的offset
processCommits();
}
//将失败的消息记录以及之后没有处理的都交给异常处理器处理
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(record);
while (iterator.hasNext()) {
records.add(iterator.next());
}
this.errorHandler.handle(decorateException(e), records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
//不做任何处理,直接把当前失败的消息记录交给异常处理器执行
this.errorHandler.handle(decorateException(e), record, this.consumer);
}
}
总结:
- 当消费消息逻辑发生异常时,spring-kafka提供了两个级别的异常处理器,@KafkaListener级别的异常处理KafkaListenerErrorHandler,以及KafkaListenerContainerFactory工厂级别的异常处理ErrorHandler。
- 若不使用KafkaListenerErrorHandler,则异常会抛至工厂级别的ErrorHandler,进行统一的异常处理。
- 相比KafkaListenerErrorHandler,工厂级别的异常处理器ErrorHandler能力更强大,可以实现不同的接口,获取到想要的参数,例如当前失败的消息记录以及之后还未消费的消息记录、consumer对象等。
更多推荐
所有评论(0)