SpringCloud集成RocketMQ

前言

RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,2017年成为 Apache 的顶级项目;以其高性能、低延时和高可靠等特性近年来已经也被越来越多的企业使用; 今天这个文章就介绍一下在SpringCloud里如何集成RocketMQ作为消息中间件的使用,本文介绍的是通过SpringCloud Stream方式进行集成;

引入依赖包

<!-- stream-rocketmq -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

生产者Producer

加注解

在主启动类上加上@EnableBinding(Source.class)

@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

Stream生产者配置

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        output:
          destination: TOPIC-01 # 目的地。这里使用 RocketMQ Topic

生产者发送消息

在Controller类里, 通过IOC,获取Source,作为消费者,产生消息,然后使用Source发送消息

    @Autowired
    private Source source;

    @GetMapping("/produce")
    public boolean produce(String msg) {
        MyMessage message = new MyMessage(msg).setId(new Random().nextInt());
        Message<MyMessage > springMessage = MessageBuilder.withPayload(message)
                .build();

        return source.output().send(springMessage);
    }

消费者(Consumer)

加注解

在主启动类上加上@EnableBinding(Sink.class)

@EnableBinding(Sink.class)
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

Stream消费者配置

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        input:
  # Topic和生产者一样,这样才能接收到生产者的Message
          destination: TOPIC-01 # 目的地。这里使用 RocketMQ Topic
  # Group组设置, 可以按自己的需要进行名称设置
          group: group-001

消费者处理消息

定义Component;在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。该方法就可以处理对应队列中的消息了,类似于以前的onMessage这样的方式

@Component
public class MyConsumer {

    private Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    @StreamListener(Sink.INPUT)
    public void onMessage(@Payload MyMessage message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

自定义SOURCE

我们在做生产者的时候,使用

@EnableBinding(Source.class)

来进行Source的定义,在生产者stream的配置里,默认为output;我们可以使用自己定义的source来扩展stream里的生产者配置

定义接口

public interface MySource {
    @Output("logger-output")
    MessageChannel logger();

    @Output("data-output")
    MessageChannel data();
}

可以将@EnableBinding改为

@EnableBinding(Source.class, MySource.class)

生产者Stream配置扩展

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        logger-output:
          destination: TOPIC-LOGGER-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
        data-output:
          destination: TOPIC-DATA-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON

发送消息

  @Autowired
    private MySource mySource;

    @GetMapping("/produce/logger")
    public boolean produceLogger(String msg) {
        MyMessage message = new MyMessage(msg).setId(new Random().nextInt());
        Message<MyMessage > springMessage = MessageBuilder.withPayload(message)
                .build();

        return mySource.logger().send(springMessage);
    }

   @GetMapping("/produce/data")
    public boolean produceData(String msg) {
        MyMessage message = new MyMessage(msg).setId(new Random().nextInt());
        Message<MyMessage > springMessage = MessageBuilder.withPayload(message)
                .build();

        return mySource.data().send(springMessage);
    }

自定义SINK

我们在做消费者的时候,使用

@EnableBinding(Sink.class)

来进行Sink的定义,在消费者stream的配置里,默认为input;我们可以使用自己定义的Sink来扩展stream里的消费者配置

定义接口

public interface MySink {		  
    String LOGGER_INPUT = "logger-input";
    String DATA_INPUT = "data-input";

    @Input(LOGGER_INPUT )
    SubscribableChannel logger();

    @Input(DATA_INPUT )
    SubscribableChannel data();
}

可以将@EnableBinding改为

@EnableBinding(Sink.class, MySink.class)

消费者Stream配置扩展

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        logger-input:
  # Topic和生产者一样,这样才能接收到生产者的Message
          destination: TOPIC-LOGGER-01 # 目的地。这里使用 RocketMQ Topic
  # Group组设置, 可以按自己的需要进行名称设置
          group: group-001
        data-input:
  # Topic和生产者一样,这样才能接收到生产者的Message
          destination: TOPIC-DATA-01 # 目的地。这里使用 RocketMQ Topic
  # Group组设置, 可以按自己的需要进行名称设置
          group: group-001

处理消息

@Component
public class MyConsumer {
    private Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    @StreamListener(MySink.LOGGER_INPUT)
    public void onLogger(@Payload MyMessage message) {
        logger.info("[onLogger][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

    @StreamListener(MySink.DATA_INPUT)
    public void onData(@Payload MyMessage message) {
        logger.info("[onData][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

结束语

通过上面的集成方式,我们使用springcloud的stream框架,分别完成了RocketMQ的消息生产者和用来处理消息的消费者;通过springcloud stream的这种集成方式,在下一个文章里,我们将基于本文中的实现过程,深入到springcloud的stream框架里去进行stream的认知。

谢谢大家继续关注

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐