本案例有1个生产者(provide),2个消费者(consumer-1,consume-2)

一、Maven依赖(生产者消费者一致)

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.3.1.RELEASE</version>
		<relativePath/> 
	</parent>
	<groupId>com.example</groupId>
	<artifactId>consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>consumer</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>8</java.version>
		<spring-cloud.version>Hoxton.SR5</spring-cloud.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<!--关键在这里,里面包含Stream和kafka,一个就够了-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

注意:springboot和springcloud的版本不要太高,特别是springboot版本最好与本文案例一致,否则会出现启动失败情况

二、配置文件(application.yml)-【目前仅启动,下面会增加分组和分区配置】

1. 生产者(provide)

server:
  port: 7888

spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092         #Kafka的消息中间件服务器
          zk-nodes: localhost:2181        #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true       #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
      bindings:
        stream-demo:          #这里可以任意写,消费者应与之一致
            destination: custom-message-topic   #这里可以任意写,消费者应与之一致,消息发往的目的地
            content-type: application/json    #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain

2. 消费者1(consume-1)

server:
  port: 7890

spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          stream-demo:
            destination: custom-message-topic
            content-type: application/json

3. 消费者2(consume-2)

server:
  port: 7889

spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          stream-demo:
            destination: custom-message-topic
            content-type: application/json

三、测试代码

1.生产者

StreamClient(必须要有):

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @Description
 * @Author 李宇麒 
 * @Version V1.0.0
 * @Date 2022/3/25
 */
public interface StreamClient {
    String STREAM_DEMO = "stream-demo";

    @Output(StreamClient.STREAM_DEMO)
    MessageChannel streamDateOut();
}

TestController(测试发送):

import com.example.demo.stream.StreamClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Description
 * @Author 李宇麒 
 * @Version V1.0.0
 * @Date 2022/3/25
 */
@RestController
public class TestController {

    @Resource
    private StreamClient streamClient;

    @GetMapping("/produce")
    public String produce() {
        for (int i = 0; i < 100; i++) {
            streamClient.streamDateOut().send(MessageBuilder.withPayload("aaaaaa" + i).build());
        }

        return "aaa";
    }

}

2.消费者(消费者1和2代码一致)

StreamClient(必须要有,注意与生产者有区别):

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Description
 * @Author 李宇麒 
 * @Version V1.0.0
 * @Date 2022/3/25
 */
public interface StreamClient {
    String STREAM_DEMO = "stream-demo";

    @Input(StreamClient.STREAM_DEMO)
    SubscribableChannel streamDateInput();	//SubscribableChannel与生产者处写的不一样,此处为接收信息
}

ReceiveData(接收信息):

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

/**
 * @Description
 * @Author 李宇麒 
 * @Version V1.0.0
 * @Date 2022/3/25
 */
@Slf4j
@EnableBinding(StreamClient.class)
public class ReceiveData {

    @StreamListener(StreamClient.STREAM_DEMO)
    public void consume(String message) {
        log.info("接收消息:{}", message);
    }
}

至此,基本收发已经配置完毕,接下来就是测试

四、运行测试

1、调用接口http://localhost:7888/produce
在这里插入图片描述
2、后台接收到数据
在这里插入图片描述

多运行几遍,是否发现有问题:两个消费者都对消息进行了消费,那这样不就会产生重复消费吗?

五、分组(解决重复消费问题)

1、概念:

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会参与消费。若不在一个消费组,则都会消费消息。

2、应用场景:

订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况,这时我们就可以使用Stream中的消息分组来解决。
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发送竞争关系,只有其中一个可以消费。

3、小结:

若group相同,则组内只会有一个实例消费;若不同,则每个实例都消费同一个topic内容

4、实践测试

  • 生产者:不需要配置
  • 消费者1:
server:
  port: 7890

kafka:
  group: lyq

spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          stream-demo:
            destination: custom-message-topic
            content-type: application/json
            group: ${kafka.group}
  • 消费者2:
server:
 port: 7889

kafka:
 group: lyq

spring:
 application:
   name: consumer_1
 cloud:
   stream:
     kafka:
       binder:
         brokers: localhost:9092
         zk-nodes: localhost:2181
         auto-create-topics: true
     bindings:
         stream-demo:
           destination: custom-message-topic4
           content-type: application/json
           group: ${kafka.group}

六、结果

略.(自行测试)

多执行几次,会出现问题:重复消费是没有了,但是每一次都是同一个消费者消费

原因:启动消费者时是否注意到:
消费者1:
在这里插入图片描述
消费者2:
在这里插入图片描述
因为kafka默认其分区数量为1,而每个分区从属的消费实例最多仅能1个【具体关于kafka的分区自行百度】

七、轮询消费(分区)

我需求要多个实例情况下,每个实例都有机会消费实例,则启动时,将该主题的分区数量,设置为>=实例数量即可

配置文件:

生产者:

server:
  port: 7888

kafka:
  group: lyq

spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092         #Kafka的消息中间件服务器
          zk-nodes: localhost:2181        #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true       #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          auto-add-partitions: true     # 当partition-count设置的值超过原来设置的值,true=自动创建分区
      bindings:
        stream-demo:          #这里用stream给我们提供的默认output,后面会讲到自定义output
            destination: custom-message-topic4   #消息发往的目的地
            content-type: application/json    #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
            producer:
              # 分区的数量(默认为1)
              partition-count: 3

消费者不需配置

启动运行即可,结果懒得演示,累了

八、配置参数

Spring Cloud Stream配置参数详解

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐