SpringBoot整合Kafka并实现广播模式
随机消费者组优点简单缺点后期监控统计困难assign模式优点简单缺点放弃集群特性,topic分区动态扩容特性等,对于kafka的使用意义大大降低转发到其他消息队列优点可以使用支持广播的消息队列完成缺点引入新的中间件,增加架构复杂度以及项目风险Slicenfer2022/07/29。...
环境说明
- IDEA 2022.2
- JDK 8.0.312
- Mac OS 13 beta 4
- SpringBoot 2.7.2
需求背景
在进行数据推送的时候,使用到了WebSocket
技术实现从服务端向客户端推送的机制,然而因为长连接的机制原因,连接会固定在一台服务器上,这时候数据产出后,需要在集群中广播以实现将数据推送给所有需要的用户。这个广播可以使用Redis
以及MQ
来实现。
这里因为数据产出侧的限制,会将数据发送到kafka,所以接入方也只好使用Kafka来消费数据。
但是有一个问题就是,kafka不支持广播模式,需要自己想办法实现。
方案分析
随机消费组
在Kafka中,同一个消费者组内的消费者是竞争消费,一个消息只能被一个消费者消费。
从这个角度出发,我们就可以将集群中的每个服务的消费者都在不同的组内,即可实现集群内的广播。
而这个消费者组随机的话,则会导致监控困难。
在一些云平台,比如阿里云,消费者组需要提前创建,这对于随机消费组来说就不太友好。即使云平台上支持随机消费者组,那样就无法监控统计到消费情况,这显然也不符合项目的运维要求。
assign模式
在Kafka中,assign模式是指放弃消费者集群,直接订阅分区,所有消费者都订阅指定分区,也可以实现分区内消息的广播消费。
![企业微信截图_16469847773808.png](https://img-blog.csdnimg.cn/img_convert/62235f34bbacd8a38fa0e0c28bbb05a0.png#clientId=u3532dcd9-9025-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=334&id=ua84935e3&margin=[object Object]&name=企业微信截图_16469847773808.png&originHeight=667&originWidth=576&originalType=binary&ratio=1&rotation=0&showTitle=false&size=231219&status=done&style=none&taskId=ub4e19171-6e81-4706-b48c-beedec921f4&title=&width=288)
转发到支持广播的消息队列
既然Kafka对于广播的支持不那么友好,那么将消息转发到其他其他消息队列,比如RocketMQ
也不失为一种方案。
这种转发可以自己编码实现,也可以借由云平台的产品。目前阿里云公测的Connector支持Kafka和其他云服务之间数据同步。
代码实现
依赖pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-broadcast</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-broadcast</name>
<description>kafka-broadcast</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
随机消费者组
启动时,保证每个Listener的groupId是唯一且随机的即可。
代码实现可参考:
@KafkaListener(topics = "broadcast_test",groupId = "broadcast_test_group" + "#{T(java.util.UUID).randomUUID()})")
public void consume(String data) {
System.out.println("消费者1消费到消息:" + data);
}
assign模式
assign模式在网上的资料偏少,原生kafka-client中可以遍历topic下的partition,然后
consume.assign(partition);
即可。
在spring-kafka下,我们一般都直接使用@KafkaListener
。
那么在这个注解里,可不可以使用assign呢?
经过一番代码查找,发现是可行的
在这个注解里有这么一个属性
/**
* The topicPartitions for this listener when using manual topic/partition
* assignment.
* <p>
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default {};
意思是这个属性设置了assign模式需要订阅的分区,那么是不是配了这个就可以了呢?
翻看代码
在org.springframework.kafka.listener.KafkaMessageListenerContainer
类中
@Override
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
// 新建消费者
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = propertiesFromProperties();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
this.clientId = determineClientId();
this.transactionTemplate = determineTransactionTemplate();
this.genericListener = listener;
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
// 判断是订阅还是监听
subscribeOrAssignTopics(this.consumer);
if (listener instanceof BatchMessageListener) {
this.listener = null;
this.batchListener = (BatchMessageListener<K, V>) listener;
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
this.batchListener = null;
this.isBatchListener = false;
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + listener.getClass().getName());
}
this.listenerType = listenerType;
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
if (this.containerProperties.getScheduler() != null) {
this.taskScheduler = this.containerProperties.getScheduler();
this.taskSchedulerExplicitlySet = true;
}
else {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.initialize();
this.taskScheduler = threadPoolTaskScheduler;
}
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property so we can use it directly from code elsewhere
this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
KafkaMessageListenerContainer.this.thisOrParentContainer
.getContainerProperties()
.setSyncCommitTimeout(this.syncCommitTimeout);
}
}
this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
this.micrometerHolder = obtainMicrometerHolder();
this.deliveryAttemptAware = setupDeliveryAttemptAware();
this.subBatchPerPartition = setupSubBatchPerPartition();
this.lastReceivePartition = new HashMap<>();
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
}
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
// 如果没有配置topicPartitions属性
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
Pattern topicPattern = this.containerProperties.getTopicPattern();
if (topicPattern != null) {
subscribingConsumer.subscribe(topicPattern, rebalanceListener);
}
else {
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
rebalanceListener);
}
}
else {
List<TopicPartitionOffset> topicPartitionsToAssign =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
this.definedPartitions.put(topicPartition.getTopicPartition(),
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
// 监听指定分区
subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
}
从这几块代码可以看出,如果@KafkaListener
中的属性topicPartitions
有值,则使用assign模式。
所以代码实现为
@KafkaListener(topics = "broadcat_test",groupId = "broadcat_test_group",
topicPartitions = {@TopicPartition(topic = "broadcat_test_sanjiu", partitions = "0")})
public void consume(String data) {
System.out.println("消费者1消费到消息:" + data);
}
总结
-
随机消费者组
- 优点:简单
- 缺点:后期监控统计困难
-
assign模式
- 优点:简单
- 缺点:放弃集群特性,topic分区动态扩容特性等,对于kafka的使用意义大大降低
-
转发到其他消息队列
- 优点:可以使用支持广播的消息队列完成
- 缺点:引入新的中间件,增加架构复杂度以及项目风险
Slicenfer
2022/07/29
更多推荐
所有评论(0)