踩坑 Kafka异常重试设置KafkaListenerContainerFactory不生效的问题, 同时源码查看kafka重试机制
场景再现最近在接触Kafka, 在消费者消费消息出现异常的时候, 会看到Kafka会一直重复拉取信息, 10次异常后才不再继续.由于我是配置了告警的, 一次异常这样一弄就是刷刷10条告警信息, 想要调整, 根据网上的资料, 增加代码配置如下:@Beanpublic ConcurrentKafkaListenerContainerFactory containerFactory() {Concurr
最近在接触Kafka, 在消费者消费消息出现异常的时候, 会看到Kafka会一直重复拉取信息, 10次异常后才不再继续.
由于我是配置了告警的, 一次异常这样一弄就是刷刷10条告警信息, 想要调整, 根据网上的资料, 增加代码配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory containerFactory( ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 最大重试次数3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
}, new FixedBackOff(5000, 3));
factory.setErrorHandler(seekToCurrentErrorHandler);
//设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
问题解决
后面启动后一个个打断点, 最后发现这个实例被Spring创建了两次, 一个是我的, 一个是默认的.
org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
@Bean
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
}));
return factory;
}
问题就在于
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
我自己定义的factory的名称没有按这个命名, 所以它认为没有这个bean, 会自己在实例化一个.
解决方法
设置下bean的名字.
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory containerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 最大重试次数3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
}, new FixedBackOff(5000, 3));
factory.setErrorHandler(seekToCurrentErrorHandler);
//设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
就正常了.测试下, 确实按照5秒间隔重试了3次.
看源码是哪里设置的10次默认重试
接着从这里看
@Bean
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
}));
return factory;
}
这里创建了一个默认的factory,
点进去configurer.configure
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory, ConsumerFactory<Object, Object> consumerFactory) {
listenerFactory.setConsumerFactory(consumerFactory);
this.configureListenerFactory(listenerFactory);
this.configureContainer(listenerFactory.getContainerProperties());
}
点进去configureListenerFactory
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
private MessageConverter messageConverter;
private KafkaTemplate<Object, Object> replyTemplate;
private KafkaAwareTransactionManager<Object, Object> transactionManager;
private ConsumerAwareRebalanceListener rebalanceListener;
private ErrorHandler errorHandler;
private BatchErrorHandler batchErrorHandler;
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private RecordInterceptor<Object, Object> recordInterceptor;
private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Listener properties = this.properties.getListener();
properties.getClass();
map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.replyTemplate).to(factory::setReplyTemplate);
if (properties.getType().equals(Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler);
} else {
factory.setErrorHandler(this.errorHandler);
}
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
}
....
}
可以看到这里设置了ErrorHandler, 这个类也是Spring管理的实例ConcurrentKafkaListenerContainerFactoryConfigurer
再来看spring初始化这个bean对象的地方:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
...
private final ErrorHandler errorHandler;
...
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
}
@Bean
@ConditionalOnMissingBean
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
MessageConverter messageConverterToUse = this.properties.getListener().getType().equals(Type.BATCH) ? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter((MessageConverter)messageConverterToUse);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
return configurer;
}
根据截图可以看到这里初始化, ErrorHandler
是Null
所以回到上面的方法factory.setErrorHandler(this.errorHandler);
, 拿到的也是null
上面这里设置告一段落, 接着来看消费者实例这边org.springframework.kafka.listener.KafkaMessageListenerContainer
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
...
}
创建监听消费者.
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = propertiesFromProperties();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
...
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
...
this.errorHandler = determineErrorHandler(errHandler);
...
}
...
protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
return errHandler != null ? (ErrorHandler) errHandler
: this.transactionManager != null ? null : new SeekToCurrentErrorHandler();
}
而它默认的ErrorHandler也是Null, 如下图所示
到这里, 才终于赋予了一个非null的ErrorHandler.
所以重点在于 new SeekToCurrentErrorHandler()
;
这里也是上面我们自定义的配置是否影响重试次数的地方, 如果我们自己配置了ConcurrentKafkaListenerContainerFactory
, 并设置了最大重试次数, 此时上面的
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
获取的就是我们设置的, 包含重试次数信息的errHandler.
它的构造函数中所做的事情:
public SeekToCurrentErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
super(recoverer, backOff);
}
这里的常量SeekUtils.DEFAULT_BACK_OFF
, 看看出自哪里.
public final class SeekUtils {
/**
* The number of times a topic/partition/offset can fail before being rejected.
*/
public static final int DEFAULT_MAX_FAILURES = 10;
/**
* The default back off - a {@link FixedBackOff} with 0 interval and
* {@link #DEFAULT_MAX_FAILURES} - 1 retry attempts.
*/
public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);
...
}
10次重试的出处
到这里才终于找到了这个10次的出处.
接着上面看, 调用了super类FailedRecordProcessor
的构造函数:
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger);
this.classifier = configureDefaultClassifier();
}
这里将重试的配置给了failureTracker
.后面它就是主角了.
当消费异常时, 就会调用org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#invokeErrorHandler
方法
private void invokeErrorHandler(final ConsumerRecord<K, V> record,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
if (this.producer == null) {
processCommits();
}
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(record);
while (iterator.hasNext()) {
records.add(iterator.next());
}
this.errorHandler.handle(decorateException(e), records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
this.errorHandler.handle(decorateException(e), record, this.consumer);
}
}
接着就调用errorHandler中的handle
方法
我们已经知道ErrorHandler
是org.springframework.kafka.listener.SeekToCurrentErrorHandler
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
getSkipPredicate(records, thrownException), this.logger, getLogLevel());
}
看方法getSkipPredicate
:
protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::skip;
}
.....
}
调用了failureTracker
的skip
方法.
重点skip()
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
if (this.noRetries) {
attemptRecovery(record, exception, null);
return true;
}
Map<TopicPartition, FailedRecord> map = this.failures.get();
if (map == null) {
this.failures.set(new HashMap<>());
map = this.failures.get();
}
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = map.get(topicPartition);
if (failedRecord == null || failedRecord.getOffset() != record.offset()) {
failedRecord = new FailedRecord(record.offset(), this.backOff.start());
map.put(topicPartition, failedRecord);
}
else {
failedRecord.getDeliveryAttempts().incrementAndGet();
}
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
try {
Thread.sleep(nextBackOff);
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
else {
attemptRecovery(record, exception, topicPartition);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
}
return true;
}
}
重点是:long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
获取重试的次数. 如果超出最大次数就返回-1.
public long nextBackOff() {
++this.currentAttempts;
return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;
}
如果是-1, 就进入attemptRecovery
方法打印异常日志.
这里其实就是调用recoverer
函数式接口BiConsumer.
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
LogAccessor logger) {
Assert.notNull(backOff, "'backOff' cannot be null");
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
Map<TopicPartition, FailedRecord> map = this.failures.get();
FailedRecord failedRecord = null;
if (map != null) {
failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
}
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
else {
this.recoverer = recoverer;
}
this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP;
this.backOff = backOff;
}
可以看到最后一次重试后确实打印了这个日志.
更多推荐
所有评论(0)