环境说明

  • IDEA 2022.2
  • JDK 8.0.312
  • Mac OS 13 beta 4
  • SpringBoot 2.7.2

需求背景

在进行数据推送的时候,使用到了WebSocket技术实现从服务端向客户端推送的机制,然而因为长连接的机制原因,连接会固定在一台服务器上,这时候数据产出后,需要在集群中广播以实现将数据推送给所有需要的用户。这个广播可以使用Redis以及MQ来实现。
这里因为数据产出侧的限制,会将数据发送到kafka,所以接入方也只好使用Kafka来消费数据。
但是有一个问题就是,kafka不支持广播模式,需要自己想办法实现。

方案分析

随机消费组

在Kafka中,同一个消费者组内的消费者是竞争消费,一个消息只能被一个消费者消费。
从这个角度出发,我们就可以将集群中的每个服务的消费者都在不同的组内,即可实现集群内的广播。
而这个消费者组随机的话,则会导致监控困难。
在一些云平台,比如阿里云,消费者组需要提前创建,这对于随机消费组来说就不太友好。即使云平台上支持随机消费者组,那样就无法监控统计到消费情况,这显然也不符合项目的运维要求。

assign模式

在Kafka中,assign模式是指放弃消费者集群,直接订阅分区,所有消费者都订阅指定分区,也可以实现分区内消息的广播消费。
![企业微信截图_16469847773808.png](https://img-blog.csdnimg.cn/img_convert/62235f34bbacd8a38fa0e0c28bbb05a0.png#clientId=u3532dcd9-9025-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=334&id=ua84935e3&margin=[object Object]&name=企业微信截图_16469847773808.png&originHeight=667&originWidth=576&originalType=binary&ratio=1&rotation=0&showTitle=false&size=231219&status=done&style=none&taskId=ub4e19171-6e81-4706-b48c-beedec921f4&title=&width=288)

转发到支持广播的消息队列

既然Kafka对于广播的支持不那么友好,那么将消息转发到其他其他消息队列,比如RocketMQ也不失为一种方案。
这种转发可以自己编码实现,也可以借由云平台的产品。目前阿里云公测的Connector支持Kafka和其他云服务之间数据同步。

代码实现

依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-broadcast</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-broadcast</name>
    <description>kafka-broadcast</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

随机消费者组

启动时,保证每个Listener的groupId是唯一且随机的即可。
代码实现可参考:

    @KafkaListener(topics = "broadcast_test",groupId = "broadcast_test_group" + "#{T(java.util.UUID).randomUUID()})")
    public void consume(String data) {
        System.out.println("消费者1消费到消息:" + data);
    }

assign模式

assign模式在网上的资料偏少,原生kafka-client中可以遍历topic下的partition,然后

consume.assign(partition);

即可。
在spring-kafka下,我们一般都直接使用@KafkaListener
那么在这个注解里,可不可以使用assign呢?
经过一番代码查找,发现是可行的
在这个注解里有这么一个属性

	/**
	 * The topicPartitions for this listener when using manual topic/partition
	 * assignment.
	 * <p>
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	TopicPartition[] topicPartitions() default {};

意思是这个属性设置了assign模式需要订阅的分区,那么是不是配了这个就可以了呢?
翻看代码
org.springframework.kafka.listener.KafkaMessageListenerContainer类中

@Override
	protected void doStart() {
		if (isRunning()) {
			return;
		}
		if (this.clientIdSuffix == null) { // stand-alone container
			checkTopics();
		}
		ContainerProperties containerProperties = getContainerProperties();
		checkAckMode(containerProperties);

		Object messageListener = containerProperties.getMessageListener();
		AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
		if (consumerExecutor == null) {
			consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = determineListenerType(listener);
        // 新建消费者
		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
		setRunning(true);
		this.startLatch = new CountDownLatch(1);
		this.listenerConsumerFuture = consumerExecutor
				.submitListenable(this.listenerConsumer);
		try {
			if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
				this.logger.error("Consumer thread failed to start - does the configured task executor "
						+ "have enough threads to support all containers and concurrency?");
				publishConsumerFailedToStart();
			}
		}
		catch (@SuppressWarnings(UNUSED) InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
			Properties consumerProperties = propertiesFromProperties();
			checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
			this.autoCommit = determineAutoCommit(consumerProperties);
			this.consumer =
					KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
							this.consumerGroupId,
							this.containerProperties.getClientId(),
							KafkaMessageListenerContainer.this.clientIdSuffix,
							consumerProperties);

			this.clientId = determineClientId();
			this.transactionTemplate = determineTransactionTemplate();
			this.genericListener = listener;
			this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
			this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
					KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
            // 判断是订阅还是监听
			subscribeOrAssignTopics(this.consumer);
			if (listener instanceof BatchMessageListener) {
				this.listener = null;
				this.batchListener = (BatchMessageListener<K, V>) listener;
				this.isBatchListener = true;
				this.wantsFullRecords = this.batchListener.wantsPollResult();
				this.pollThreadStateProcessor = setUpPollProcessor(true);
			}
			else if (listener instanceof MessageListener) {
				this.listener = (MessageListener<K, V>) listener;
				this.batchListener = null;
				this.isBatchListener = false;
				this.wantsFullRecords = false;
				this.pollThreadStateProcessor = setUpPollProcessor(false);
			}
			else {
				throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
						+ "'BatchMessageListener', or the variants that are consumer aware and/or "
						+ "Acknowledging"
						+ " not " + listener.getClass().getName());
			}
			this.listenerType = listenerType;
			this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
					|| listenerType.equals(ListenerType.CONSUMER_AWARE);
			this.commonErrorHandler = determineCommonErrorHandler();
			Assert.state(!this.isBatchListener || !this.isRecordAck,
					"Cannot use AckMode.RECORD with a batch listener");
			if (this.containerProperties.getScheduler() != null) {
				this.taskScheduler = this.containerProperties.getScheduler();
				this.taskSchedulerExplicitlySet = true;
			}
			else {
				ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
				threadPoolTaskScheduler.initialize();
				this.taskScheduler = threadPoolTaskScheduler;
			}
			this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
					Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
			if (this.containerProperties.isLogContainerConfig()) {
				this.logger.info(toString());
			}
			Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
			this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
					|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
			this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
					|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
			this.syncCommitTimeout = determineSyncCommitTimeout();
			if (this.containerProperties.getSyncCommitTimeout() == null) {
				// update the property so we can use it directly from code elsewhere
				this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
				if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
					KafkaMessageListenerContainer.this.thisOrParentContainer
							.getContainerProperties()
							.setSyncCommitTimeout(this.syncCommitTimeout);
				}
			}
			this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
			this.micrometerHolder = obtainMicrometerHolder();
			this.deliveryAttemptAware = setupDeliveryAttemptAware();
			this.subBatchPerPartition = setupSubBatchPerPartition();
			this.lastReceivePartition = new HashMap<>();
			this.lastAlertPartition = new HashMap<>();
			this.wasIdlePartition = new HashMap<>();
		}
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
            // 如果没有配置topicPartitions属性
			if (KafkaMessageListenerContainer.this.topicPartitions == null) {
				ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
				Pattern topicPattern = this.containerProperties.getTopicPattern();
				if (topicPattern != null) {
					subscribingConsumer.subscribe(topicPattern, rebalanceListener);
				}
				else {
					subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
							rebalanceListener);
				}
			}
			else {
				List<TopicPartitionOffset> topicPartitionsToAssign =
						Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
				this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
				for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
					this.definedPartitions.put(topicPartition.getTopicPartition(),
							new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
									topicPartition.getPosition()));
				}
                // 监听指定分区
				subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
			}
		}

从这几块代码可以看出,如果@KafkaListener中的属性topicPartitions有值,则使用assign模式。
所以代码实现为

 @KafkaListener(topics = "broadcat_test",groupId = "broadcat_test_group",
            topicPartitions = {@TopicPartition(topic = "broadcat_test_sanjiu", partitions = "0")})
    public void consume(String data) {
        System.out.println("消费者1消费到消息:" + data);
    }

总结

  1. 随机消费者组

    1. 优点:简单
    2. 缺点:后期监控统计困难
  2. assign模式

    1. 优点:简单
    2. 缺点:放弃集群特性,topic分区动态扩容特性等,对于kafka的使用意义大大降低
  3. 转发到其他消息队列

    1. 优点:可以使用支持广播的消息队列完成
    2. 缺点:引入新的中间件,增加架构复杂度以及项目风险

    Slicenfer
    2022/07/29

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐