@EnableBinding源码中明确声明

该注解在从3.1版本开始被弃用,推荐我们使用函数编程的方式

我将给出一个生产者和消费者的使用案例:

生产者案例:

yml配置:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 192.168.220.101
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        myChannel-out-0: #这个名字是一个通道的名字
          destination: studyExchange #表示要使用Exchange名称定义
          contentType: application/json #设置消息类型,本次为json,文本则设置为“text/plain”

注意bingdings 集合中的key由 通道名-out/in-数字组成

service层代码:

package com.jx.springCloud.service.impl;

import com.jx.springCloud.service.MessageProvider;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * @author LDW
 * @date 2022/4/10 20:35
 */
@Service
public class MessageProviderImpl implements MessageProvider {

    private final StreamBridge streamBridge;

    public MessageProviderImpl(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        streamBridge.send("myChannel-out-0", MessageBuilder.withPayload(serial).build());
        System.out.println("发送消息: " + serial);
        return null;
    }
}

@Autowire注解自动注入StreamBridge的实例,上述代码的写法省去了@Autowire注解

StreamBridge的send方法第一个参数是binding的名字,第二个参数是想要发送的消息

controller层代码:

@RestController
public class SendMessageController {

    private final MessageProvider messageProvider;

    public SendMessageController(MessageProvider messageProvider) {
        this.messageProvider = messageProvider;
    }

    @GetMapping("/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

消费者案例:

yml配置:

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 192.168.220.101
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings: #服务的整合处理
        myChannel-in-0: #这个名字是一个通道的名字
          destination: studyExchange #表示要使用Exchange名称定义
          contentType: application/json #设置消息类型,本次为json,文本则设置为“text/plain”

service层代码:

@Service
@Slf4j
public class StreamConsumerService {
    @Bean
    public Consumer<String> myChannel() {
        return message -> log.info("消息:"+message);
    }
}

上述代码的方法名(即Consumer的bean实例名)需要是yml配置中的通道名,应用程序启动后会自动接收生产者发送的消息

发送请求:localhost:8801/sendMessage

 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐