kafka每个partition都有自己的offset,消费端处理完后要向kafka服务器提交offset。
spring-kafka组件有下面几种AckMode提交模式:都是针对.enable-auto-commit设置为false才适用,如果true是kafka根据自身配置来提交的。默认AckMode是BATCH
在这里插入图片描述
在这里插入图片描述

模式 描述
MANUAL poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
MANUAL_IMMEDIATE 每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
COUNT_TIME TIME或COUNT满足其中一个时提交

MANUAL和MANUAL_IMMEDIATE模式要将spring.kafka.consumer.enable-auto-commit设置为false

@KafkaListener注解

@KafkaListener(
            topics = "test",
            groupId = "testGroup"
    )
    public void listener(ConsumerRecord<String, String> record, Acknowledgment ack) {
    // 处理业务
    ack.acknowledge();
    }

如果spring.kafka.consumer.enable-auto-commit=true,方法参数不能包含Acknowledgment ack,否则会报错。

@KafkaListener(
            topics = "test",
            groupId = "testGroup"
    )
    public void listener(ConsumerRecord<String, String> record) {
    // 处理业务
    // 不用手动提交
    }

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#run
项目启动后这个run方法会不停从kafka服务器broke拉取(poll)消息回来消费

while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();

					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}

其中this.consumer.poll()方法拉取消息,invokeListener方法是执行处理消息逻辑。

private void invokeListener(final ConsumerRecords<K, V> records) {
			if (this.isBatchListener) {
				invokeBatchListener(records);
			}
			else {
				invokeRecordListener(records);
			}
		}
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
			if (this.transactionTemplate != null) {
				innvokeRecordListenerInTx(records);
			}
			else {
				doInvokeWithRecords(records);
			}
		}
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) throws Error {
			Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
			while (iterator.hasNext()) {
				final ConsumerRecord<K, V> record = iterator.next();
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("Processing " + record);
				}
				doInvokeRecordListener(record, null);
			}
		}

最终会走到doInvokeRecordListener方法

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
				@SuppressWarnings("rawtypes") Producer producer) throws Error {
			try {
				if (this.acknowledgingMessageListener != null) {
					this.acknowledgingMessageListener.onMessage(record,
							this.isAnyManualAck
									? new ConsumerAcknowledgment(record)
									: null);
				}
				else {
					this.listener.onMessage(record);
				}
				ackCurrent(record, producer);
			}

最终会调用onMessage方法,其中第二个参数就是Acknowledgment,有可能是null,看设置的auto.commit值决定。

public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
			if (this.isRecordAck) {
				Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
						Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
								new OffsetAndMetadata(record.offset() + 1));
				if (producer == null) {
					if (this.logger.isDebugEnabled()) {
						this.logger.debug("Committing: " + offsetsToCommit);
					}
					if (this.containerProperties.isSyncCommits()) {
						this.consumer.commitSync(offsetsToCommit);
					}
					else {
						this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
					}
				}
				else {
					this.acks.add(record);
				}
			}
			else if (!this.isAnyManualAck && !this.autoCommit) {
				this.acks.add(record);
			}
			if (producer != null) {
				try {
					sendOffsetsToTransaction(producer);
				}
				catch (Exception e) {
					this.logger.error("Send offsets to transaction failed", e);
				}
			}
		}

当我们设置了手动提交,也就是MANUAL或者MANUAL_IMMEDIATE,是通过调用ack.acknowledge()方法

@Override
			public void acknowledge() {
				Assert.state(ListenerConsumer.this.isAnyManualAck,
						"A manual ackmode is required for an acknowledging listener");
				for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
					processAck(record);
				}
			}
private void processAck(ConsumerRecord<K, V> record) {
			if (!Thread.currentThread().equals(this.consumerThread)) {
				try {
					this.acks.put(record);
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new KafkaException("Interrupted while storing ack", e);
				}
			}
			else {
				if (this.isManualImmediateAck) {
					try {
						ackImmediate(record);
					}
					catch (WakeupException e) {
						// ignore - not polling
					}
				}
				else {
					addOffset(record);
				}
			}
		}
private void processAck(ConsumerRecord<K, V> record) {
			if (!Thread.currentThread().equals(this.consumerThread)) {
				try {
					this.acks.put(record);
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new KafkaException("Interrupted while storing ack", e);
				}
			}
			else {
				if (this.isManualImmediateAck) {
					try {
						ackImmediate(record);
					}
					catch (WakeupException e) {
						// ignore - not polling
					}
				}
				else {
					addOffset(record);
				}
			}
		}

MANUAL_IMMEDIATE模式直接提交offset,MANUAL模式则先把要提交的offset放到map中,然后返回。

再看回最上面的run方法,其中processCommits()就是从map中拿出要提交的offset,然后批量提交。也就是在下一次poll之前做了提交,相当于处理完了上一次poll回来的所有消息后,然后再一起提交。

他们区别:MANUAL_IMMEDIATE是消费完一个消息就提交,MANUAL是处理完一批消息,在下一次拉取消息之前批量提交。拉取批量消息可以通过max.poll.record设置最大,默认是500条。前提是消息大小满足最大限制,否则一批也拉取不到最大的500条。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐