Spring Cloud Stream的配置及使用——以RabbitMQ为例
Spring Cloud Stream配置及使用,以RabbtiMQ为例
·
1. 简介
https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html
英语好的可以直接看官方文档,文档里讲的更全面
By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue is bound to that TopicExchange.
上图是RabbitMQ Binder(绑定器)。默认情况下,绑定器实现将每一个destination映射到一个TopicExchange。对于每一个消费者组,都有一个队列绑定到那个TopicExchange。
2. 依赖配置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3. 生产者配置及消息发送
3.1 yaml配置
spring:
cloud:
stream:
# 如果有一个binder的话,就不需要设置
default-binder: rabbit
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.70.224
port: 5672
username: admin
password: 444944
virtual-host: GHost
rabbit:
type: rabbit
defaultCandidate: false
environment:
spring:
rabbitmq:
host: 192.168.70.167
port: 5672
username: admin
password: public
virtual-host: /
bindings:
order:
binder: rabbit
destination: order
producer:
# 默认是true
autoStartup: true
cart:
binder: rabbit
destination: cart
routingKeyExpression: han
producer:
# 默认是true
autoStartup: true
# rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-<partition>
配置说明:
- 在Spring Cloud Stream中可以配置多个binder,也就是可以配置连接多个MQ服务器
- 在RabbitMQ中,binding的名称对应的是output和input的名称,destination对应mq中的exchange名称。上述配置会生成order,cart两个exchange
- rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-,分区内容这里不涉及,具体内容看官方文档
3.2 生产者声明
public interface CartSource {
/**
* Name of the output channel.
* cart对应的是binding,
* destination对应的是rabbitmq里的exchange,kafka中的topic.
* 如果没有设置destination, rabbitmq会自动创建一个和binding同名的exchange
*/
String OUTPUT = "cart";
/**
* @return output channel
*/
@Output(CartSource.OUTPUT)
MessageChannel output();
}
cloud stream中output表示发送,这里的cart与配置中的binding名称是对应的
//必须添加,要不然无法注入。也可以加在启动类上,可以重复添加
@EnableBinding(CartSource.class)
@Component
public class CartSender {
@Autowired
private CartSource orderSource;
private static final Logger logger= LoggerFactory.getLogger(CartSender.class);
public void pushMsg(Order order){
logger.info("sending rabbitmq message:{}",order.toString());
orderSource.output().send(MessageBuilder.withPayload(order).build());
}
3.3 消息发送
@RestController
@RequestMapping("mqTest1")
public class MqTest1Controller {
@Autowired
CartSender cartSender;
@GetMapping("streamPush")
public String streamPush(){
cartSender.pushMsg(new Order());
return "hehe";
}
}
4. 消费者配置
4.1 yaml配置
spring:
cloud:
stream:
# 如果有一个binder的话,就不需要设置
default-binder: rabbit
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.70.224
port: 5672
username: admin
password: 444944
virtual-host: /
rabbit:
type: rabbit
defaultCandidate: false
environment:
spring:
rabbitmq:
host: 192.168.70.167
port: 5672
username: admin
password: public
virtual-host: /
bindings:
order:
binder: rabbit
destination: order
group: myOrderQueue
conumer:
concurrency: 3
cart:
binder: rabbit
destination: cart
group: myCartQueue
conumer:
concurrency: 3
# rabbit的扩展配置 RabbitExtendedBindingProperties
rabbit:
bindings:
# order:
# consumer:
# bindingRoutingKey: order-key
cart:
consumer:
# 如果没有指定routing key,会使用默认的 #
bindingRoutingKey: cart-key
配置说明:
- 只有消费者才会创建队列,queue名称为:
<bindingName><destination>.<group>
- routingKey设置
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
如果没有设置的话,默认为#
4.2 消费者声明与消息接收
public interface CartSink {
String INPUT = "cart";
/**
* @return input channel.
*/
@Input(CartSink.INPUT)
SubscribableChannel input();
}
cloud stream中input表示接收,这里的cart与配置中的binding名称是对应的
@EnableBinding(CartSink.class)
public class CartHandler {
private static final Logger logger = LoggerFactory.getLogger(CartHandler.class);
/**
* 参数也可以是对象,会自动将消息转换为对象
* @param headers
* @param payload
*/
@StreamListener(CartSink.INPUT)
public void loggerSink(@Headers MessageHeaders headers, byte[] payload){
String cartChange=new String(payload);
logger.info("cart change:{}",cartChange);
}
}
更多推荐
已为社区贡献5条内容
所有评论(0)