最近在接触Kafka, 在消费者消费消息出现异常的时候, 会看到Kafka会一直重复拉取信息, 10次异常后才不再继续.

由于我是配置了告警的, 一次异常这样一弄就是刷刷10条告警信息, 想要调整, 根据网上的资料, 增加代码配置如下:

@Bean
  public ConcurrentKafkaListenerContainerFactory containerFactory( ConsumerFactory consumerFactory) {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    // 最大重试次数3次
    SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
      log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
    }, new FixedBackOff(5000, 3));
    factory.setErrorHandler(seekToCurrentErrorHandler);
    //设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
  }

问题解决

后面启动后一个个打断点, 最后发现这个实例被Spring创建了两次, 一个是我的, 一个是默认的.
org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration

  @Bean
  @ConditionalOnMissingBean(
    name = {"kafkaListenerContainerFactory"}
  )
  ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
    configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
      return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
    }));
    return factory;
  }

问题就在于

  @ConditionalOnMissingBean(
    name = {"kafkaListenerContainerFactory"}
  )

我自己定义的factory的名称没有按这个命名, 所以它认为没有这个bean, 会自己在实例化一个.

解决方法

设置下bean的名字.

  @Bean("kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory containerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    // 最大重试次数3次
    SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
      log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
    }, new FixedBackOff(5000, 3));
    factory.setErrorHandler(seekToCurrentErrorHandler);
    //设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
  }

就正常了.测试下, 确实按照5秒间隔重试了3次.
在这里插入图片描述

看源码是哪里设置的10次默认重试

接着从这里看

  @Bean
  @ConditionalOnMissingBean(
    name = {"kafkaListenerContainerFactory"}
  )
  ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
    configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
      return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
    }));
    return factory;
  }

这里创建了一个默认的factory,
点进去configurer.configure

  public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory, ConsumerFactory<Object, Object> consumerFactory) {
    listenerFactory.setConsumerFactory(consumerFactory);
    this.configureListenerFactory(listenerFactory);
    this.configureContainer(listenerFactory.getContainerProperties());
  }

点进去configureListenerFactory

public class ConcurrentKafkaListenerContainerFactoryConfigurer {
  private KafkaProperties properties;
  private MessageConverter messageConverter;
  private KafkaTemplate<Object, Object> replyTemplate;
  private KafkaAwareTransactionManager<Object, Object> transactionManager;
  private ConsumerAwareRebalanceListener rebalanceListener;
  private ErrorHandler errorHandler;
  private BatchErrorHandler batchErrorHandler;
  private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
  private RecordInterceptor<Object, Object> recordInterceptor;

 private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory) {
    PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
    Listener properties = this.properties.getListener();
    properties.getClass();
    map.from(properties::getConcurrency).to(factory::setConcurrency);
    map.from(this.messageConverter).to(factory::setMessageConverter);
    map.from(this.replyTemplate).to(factory::setReplyTemplate);
    if (properties.getType().equals(Type.BATCH)) {
      factory.setBatchListener(true);
      factory.setBatchErrorHandler(this.batchErrorHandler);
    } else {
      factory.setErrorHandler(this.errorHandler);
    }

    map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
    map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
  }
....
}

可以看到这里设置了ErrorHandler, 这个类也是Spring管理的实例ConcurrentKafkaListenerContainerFactoryConfigurer

再来看spring初始化这个bean对象的地方:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
	...

	private final ErrorHandler errorHandler;
	...
	
	KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
			ObjectProvider<RecordMessageConverter> messageConverter,
			ObjectProvider<BatchMessageConverter> batchMessageConverter,
			ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
			ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
			ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
			ObjectProvider<BatchErrorHandler> batchErrorHandler,
			ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
			ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
		this.properties = properties;
		this.messageConverter = messageConverter.getIfUnique();
		this.batchMessageConverter = batchMessageConverter
				.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
		this.kafkaTemplate = kafkaTemplate.getIfUnique();
		this.transactionManager = kafkaTransactionManager.getIfUnique();
		this.rebalanceListener = rebalanceListener.getIfUnique();
		this.errorHandler = errorHandler.getIfUnique();
		this.batchErrorHandler = batchErrorHandler.getIfUnique();
		this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
		this.recordInterceptor = recordInterceptor.getIfUnique();
	}
  @Bean
  @ConditionalOnMissingBean
  ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
    configurer.setKafkaProperties(this.properties);
    MessageConverter messageConverterToUse = this.properties.getListener().getType().equals(Type.BATCH) ? this.batchMessageConverter : this.messageConverter;
    configurer.setMessageConverter((MessageConverter)messageConverterToUse);
    configurer.setReplyTemplate(this.kafkaTemplate);
    configurer.setTransactionManager(this.transactionManager);
    configurer.setRebalanceListener(this.rebalanceListener);
    configurer.setErrorHandler(this.errorHandler);
    configurer.setBatchErrorHandler(this.batchErrorHandler);
    configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
    configurer.setRecordInterceptor(this.recordInterceptor);
    return configurer;
  }

在这里插入图片描述
在这里插入图片描述
根据截图可以看到这里初始化, ErrorHandler是Null

所以回到上面的方法factory.setErrorHandler(this.errorHandler);, 拿到的也是null
在这里插入图片描述

上面这里设置告一段落, 接着来看消费者实例这边org.springframework.kafka.listener.KafkaMessageListenerContainer

	if (isRunning()) {
			return;
		}
		if (this.clientIdSuffix == null) { // stand-alone container
			checkTopics();
		}
		ContainerProperties containerProperties = getContainerProperties();
		checkAckMode(containerProperties);

		Object messageListener = containerProperties.getMessageListener();
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = determineListenerType(listener);
		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
		...
	}

创建监听消费者.
this.listenerConsumer = new ListenerConsumer(listener, listenerType);

	ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
			Properties consumerProperties = propertiesFromProperties();
			checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
			this.autoCommit = determineAutoCommit(consumerProperties);
			...
			GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
			...
			this.errorHandler = determineErrorHandler(errHandler);
			...

	}

	...

	protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
		return errHandler != null ? (ErrorHandler) errHandler
				: this.transactionManager != null ? null : new SeekToCurrentErrorHandler();
	}

而它默认的ErrorHandler也是Null, 如下图所示
在这里插入图片描述
到这里, 才终于赋予了一个非null的ErrorHandler.
所以重点在于 new SeekToCurrentErrorHandler();

这里也是上面我们自定义的配置是否影响重试次数的地方, 如果我们自己配置了ConcurrentKafkaListenerContainerFactory, 并设置了最大重试次数, 此时上面的

GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();

获取的就是我们设置的, 包含重试次数信息的errHandler.

它的构造函数中所做的事情:

	public SeekToCurrentErrorHandler() {
		this(null, SeekUtils.DEFAULT_BACK_OFF);
	}

	public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
		super(recoverer, backOff);
	}

这里的常量SeekUtils.DEFAULT_BACK_OFF, 看看出自哪里.

public final class SeekUtils {

	/**
	 * The number of times a topic/partition/offset can fail before being rejected.
	 */
	public static final int DEFAULT_MAX_FAILURES = 10;

	/**
	 * The default back off - a {@link FixedBackOff} with 0 interval and
	 * {@link #DEFAULT_MAX_FAILURES} - 1 retry attempts.
	 */
	public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);
	...
}
10次重试的出处

到这里才终于找到了这个10次的出处.

接着上面看, 调用了super类FailedRecordProcessor的构造函数:

	protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
		this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger);
		this.classifier = configureDefaultClassifier();
	}

这里将重试的配置给了failureTracker.后面它就是主角了.

当消费异常时, 就会调用org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#invokeErrorHandler方法

		private void invokeErrorHandler(final ConsumerRecord<K, V> record,
				Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {

			if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
				if (this.producer == null) {
					processCommits();
				}
				List<ConsumerRecord<?, ?>> records = new ArrayList<>();
				records.add(record);
				while (iterator.hasNext()) {
					records.add(iterator.next());
				}
				this.errorHandler.handle(decorateException(e), records, this.consumer,
						KafkaMessageListenerContainer.this.thisOrParentContainer);
			}
			else {
				this.errorHandler.handle(decorateException(e), record, this.consumer);
			}
		}

接着就调用errorHandler中的handle方法
在这里插入图片描述
我们已经知道ErrorHandlerorg.springframework.kafka.listener.SeekToCurrentErrorHandler

	@Override
	public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
			Consumer<?, ?> consumer, MessageListenerContainer container) {

		SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
				getSkipPredicate(records, thrownException), this.logger, getLogLevel());
	}

看方法getSkipPredicate:

	protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
			Exception thrownException) {

		if (getClassifier().classify(thrownException)) {
			return this.failureTracker::skip;
		}
		.....
	}

调用了failureTrackerskip方法.

重点skip()
	boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
		if (this.noRetries) {
			attemptRecovery(record, exception, null);
			return true;
		}
		Map<TopicPartition, FailedRecord> map = this.failures.get();
		if (map == null) {
			this.failures.set(new HashMap<>());
			map = this.failures.get();
		}
		TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
		FailedRecord failedRecord = map.get(topicPartition);
		if (failedRecord == null || failedRecord.getOffset() != record.offset()) {
			failedRecord = new FailedRecord(record.offset(), this.backOff.start());
			map.put(topicPartition, failedRecord);
		}
		else {
			failedRecord.getDeliveryAttempts().incrementAndGet();
		}
		long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
		if (nextBackOff != BackOffExecution.STOP) {
			try {
				Thread.sleep(nextBackOff);
			}
			catch (@SuppressWarnings("unused") InterruptedException e) {
				Thread.currentThread().interrupt();
			}
			return false;
		}
		else {
			attemptRecovery(record, exception, topicPartition);
			map.remove(topicPartition);
			if (map.isEmpty()) {
				this.failures.remove();
			}
			return true;
		}
	}

重点是:long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
获取重试的次数. 如果超出最大次数就返回-1.

    public long nextBackOff() {
      ++this.currentAttempts;
      return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;
    }

在这里插入图片描述
如果是-1, 就进入attemptRecovery方法打印异常日志.
在这里插入图片描述
这里其实就是调用recoverer函数式接口BiConsumer.

FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
			LogAccessor logger) {

		Assert.notNull(backOff, "'backOff' cannot be null");
		if (recoverer == null) {
			this.recoverer = (rec, thr) -> {
				Map<TopicPartition, FailedRecord> map = this.failures.get();
				FailedRecord failedRecord = null;
				if (map != null) {
					failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
				}
				logger.error(thr, "Backoff "
					+ (failedRecord == null
						? "none"
						: failedRecord.getBackOffExecution())
					+ " exhausted for " + ListenerUtils.recordToString(rec));
			};
		}
		else {
			this.recoverer = recoverer;
		}
		this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP;
		this.backOff = backOff;
	}

可以看到最后一次重试后确实打印了这个日志.
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐