springCloudStream集成rabbitmq

SpringCloudStream框架封装出了三个最基础的概念来对各种消息中间件提供统一的抽象:

  • Destination Binders:负责集成外部消息系统的组件。
  • Destination Binding:由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。
  • Message:消息发送者与消息消费者沟通的简单数据结构。

简单使用案例

引入依赖
RabbitMQ的SpringCloudStream支持是由Spring社区官网提供的,所以这也是相当成熟的一种集成方案。但是要注意,SpringCloudStream框架集成的版本通常是比RabbitMQ产品本身落后几个版本的,使用时需要注意。

  <properties>
        <spring.cloud.version>Hoxton.SR12</spring.cloud.version>
    </properties>
 <dependencyManagement>
        <dependencies>
			<dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
</dependencyManagement>

 <dependencies>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
 </dependencies>

依赖的版本通常建议使用SpringCloud的整体版本控制。

基础使用方法

spring.rabbitmq.host=id
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=baseDemo
spring.cloud.stream.bindings.output.destination=mqExchange
spring.cloud.stream.bindings.input.destination=mqExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain

声明Sink 消息消费者

@Component
@EnableBinding(Sink.class)
public class MqConsumer {
	private Logger logger = LoggerFactory.getLogger(MqConsumer.class);
	@StreamListener(Sink.INPUT)
	public void process(Object message) {
		System.out.println("received message : " + message);
		logger.info("received message : {}", message);
	}
}

声明Source 生产者

@Component
@EnableBinding(Source.class)
public class MqProduct {

}

测试类

@RestController
@RequestMapping("api/test")
@Api(value = "测试用例", tags = "测试用例")
public class testController {
	@Autowired
	private final Source source;
	@GetMapping("/send")
	@ApiOperation(value = "测试mq")
	@ApiOperationSupport(order = 11, author = "lsx")
	public R send(String message) {
		MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
		source.output().send(messageBuilder.build());
		return R.data( "message sended : "+message) ;
	}
}

这样就能接收到消息,非常简单。当然这只是简单的案例,在开发当中不会这么去用。

开发中的使用

定义Binders,每个Binders就相当于一个消息中间件,可以指定多个Binders,每一个Binders代表一个消息中间件服务,比如一个Binders指向rabbitmq,另一个Binders指向其他的mq产品,例如kafka
application.properties

#spring.cloud.stream.binders.<bindername>.<prop>=value
spring.cloud.stream.binders.bxbinder.type=rabbit
#spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.addresses=192.168.232.128:5672,192.168.232.129:5672,192.168.232.130:5672
spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.host=ip
spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.virtual-host=baseDemo
# 如果配置多个binders 需要指定默认的binders
# spring.cloud.stream.default-binder=bxbinder
#=====================配置binding 一个binding对应一个队列或者交换机
# bindings后面的bxExchangeOrder、inputOrder 是接口中定义的名称
#destination的值表示创建对应名称交换机或者队列
spring.cloud.stream.bindings.bxExchangeOrder.destination=bxExchangeOrder
spring.cloud.stream.bindings.inputOrder.destination=bxExchangeOrder
#指定队列名。如果没有指定会生成一个很长的默认名字的队列。 此时生成的队列名就是destination的值+group的值
spring.cloud.stream.bindings.inputOrder.group=bxOutput
spring.cloud.stream.bindings.bxExchangeGoods.destination=bxExchangeGoods
spring.cloud.stream.bindings.inputGoods.destination=bxExchangeGoods
spring.cloud.stream.bindings.inputGoods.group=bxOutput

定义交换机和队列

public interface BxExchange {
	String OUTPUTORDER = "bxExchangeOrder";

	String OUTPUTGOODS = "bxExchangeGoods";

	//@Output 里面的值要和 配置文件中bindings.后面的值 相同 
	@Output(OUTPUTORDER)
	MessageChannel outputOrder();

	@Output(OUTPUTGOODS)
	MessageChannel outputGoods();
}

public interface BxQueue {
	//@Input 里面的值要和 配置文件中bindings.后面的值相同
	@Input("inputOrder")
	SubscribableChannel inputOrder();

	@Input("inputGoods")
	SubscribableChannel inputGoods();

}

定义消费者和发送工具类

public class MqConsumer {
	private Logger logger = LoggerFactory.getLogger(MqConsumer.class);

	@StreamListener("inputOrder")
	public void processOrder(Object message) {
		System.out.println("received order message : " + message);
	}

	@StreamListener("inputGoods")
	public void processGoods(Object message) {
		System.out.println("received goods message : " + message);
	}
}

@Component
public class MqUtil {
	@Resource
	private MessageChannel bxExchangeOrder;

	@Resource
	private MessageChannel bxExchangeGoods;

	public boolean sendMessageOnOrder(JSONObject json){
		MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json);
		boolean b =  bxExchangeOrder.send(messageBuilder.build());
		return b;
	}

	public boolean sendMessageOnGoods(JSONObject json){
		MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json);
		boolean b =  bxExchangeGoods.send(messageBuilder.build());
		return b;
	}
}

config

@Configuration
@EnableBinding({BxQueue.class, BxExchange.class})
public class StreamMqConfig {
	@Bean
	public MqConsumer mqConsumer(){
		return new MqConsumer();
	}
}

测试

@RestController
@RequestMapping("api/test")
public class testController {
	@Autowired
	private MqUtil mqUtil;
	
	@GetMapping("/send")
	public R send(Object message) {
		GenEntity genEntity = new GenEntity().setCodeName("测试").
			setDatabaseName("测试").setTableName("中国").setPrefix("xmkf");
		JSONObject genJson = (JSONObject) JSONObject.toJSON(genEntity);
		boolean b = mqUtil.sendMessageOnOrder(genJson);
		return R.data("message sended : " + b);
	}
}

使用原生消息转发机制

springcloudStream其实自身实现了一套事件驱动的流程。这种流程,对于各种不同的MQ产品都是一样的。但是,毕竟每个MQ产品的实现机制和功能特性是不一样的,所以,SCStream还是提供了一套针对各个MQ产品的兼容机制。
在RabbitMQ的实现中,所有个性化的属性配置实现都是以spring.cloud.stream.rabbit开头,支持对binder、producer、consumer进行单独配置

#绑定到已经存在的交换机 和 队列
spring.cloud.stream.bindings.bxExchangeOrder.destination=rabbitExchange
spring.cloud.stream.bindings.inputOrder.destination=rabbitExchange
spring.cloud.stream.bindings.inputOrder.group=rabbitQueue
#不自动声明exchange(自动声明的exchange都是topic)
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.exchange-type=topic
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.routing-key-expression=headers.routingKey
#不自动创建队列
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.bind-queue=false
#队列名只声明组名(前面不带destination前缀)
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.queue-name-group-only=true
#交换机和队列通过routing-key 进行绑定
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.binding-routing-key=bbbb

消息发送
MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json);
boolean b = bxExchangeOrder.send(messageBuilder.setHeader("routingKey", "bbbb").build());

Headers路由

上面的代码都通过setHeader方法放入路由键 其实就是一种Headers路由。也可以根据业务要求,往head中存放其他的值。
如何获取head中的值

@StreamListener("inputOrder")
	public void processOrder(Message<Object> message) {
		Object payload = message.getPayload();
		String routingKey = message.getHeaders().get("routingKey")+"";
		System.out.println("received order message : " + payload + "\n" + "received order routingKey:"+ routingKey);
	}

分组消费策略

对于实现相同组消费者 只消费一次消息,不同组消费相同的消息

#==================原生rabbitmq的配置 使用已有的交换机和队列
#绑定到已经存在的交换机 和 队列
spring.cloud.stream.bindings.bxExchangeOrder.destination=rabbitExchange
spring.cloud.stream.bindings.inputOrder.destination=rabbitExchange
spring.cloud.stream.bindings.inputOrder.group=rabbitQueue
#不自动声明exchange(自动声明的exchange都是topic)
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.exchange-type=topic
spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.routing-key-expression=headers.routingKey
#不自动创建队列
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.bind-queue=false
#队列名只声明组名(前面不带destination前缀)
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.queue-name-group-only=true
#交换机和队列通过routing-key 进行绑定
spring.cloud.stream.rabbit.bindings.inputOrder.consumer.binding-routing-key=bbbb
#====================分组消费=================
spring.cloud.stream.bindings.inputGoods.destination=rabbitExchange
spring.cloud.stream.bindings.inputGoods.group=rabbitQueue1
spring.cloud.stream.rabbit.bindings.inputGoods.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.inputGoods.consumer.binding-routing-key=bbbb

消费者

public class MqConsumer {
	private Logger logger = LoggerFactory.getLogger(MqConsumer.class);

	@StreamListener("inputOrder")
	public void processOrder(Message<Object> message) {
		Object payload = message.getPayload();
		String routingKey = message.getHeaders().get("routingKey")+"";
		System.out.println("received order message : " + payload + "\n" + "received order routingKey:"+ routingKey);
	}

	@StreamListener("inputGoods")
	public void processGoods(Message<Object> message) {
		Object payload = message.getPayload();
		String routingKey = message.getHeaders().get("routingKey")+"";
		System.out.println("received goods message : " + payload + "\n" + "received goods routingKey:"+ routingKey);
	}

}

其中inputOrder和inputGoods就是不同的分组然后,再开一个实例表示一组中有两个消费者
在这里插入图片描述
修改端口防止冲突
发送消息,可以发现相同组的消费者只会消费一次,不同的组的消费者会消费同一个消息
这是其中一个实例的消费,另一个实例没有
在这里插入图片描述

消费确认机制

最近项目中有用到stream继承rabbitmq,补充一下消息确认消费
配置文件:

spring:
  cloud:
    stream:
      default:
        consumer:
          # 重试次数
          max-attempts: 5
      rabbit:
        default:
          consumer:
          	# 手动确认
            acknowledge-mode: manual
      # 指定使用哪一个binders
      default-binder: demobinder
      binders:
        # 定义一个binder 表示rabbitmq
        demobinder:
          type: rabbit
          # 配置rabbitmq 环境配置
          environment:
            spring:
              rabbitmq:
                host: ip
                port: 5672
                username: xxxx
                password: xxxx
                virtual-host: xxxx
      # 绑定交换机和队列
      bindings:
        hkUpcallbackExchange:
          destination: hkUpcallback
        hkDowncallbackExchange:
          destination: hkDowncallback
        hkUpcallbackQueue:
          destination: hkUpcallback
          group: hkOutput
        hkDowncallbackQueue:
          destination: hkDowncallback
          group: hkOutput

配置信息类

public interface HkExchange {
	String HK_UP_EXCHANGE = "hkUpcallbackExchange";

	String HK_DOWN_EXCHANGE = "hkDowncallbackExchange";

	//@Output 里面的值要和 配置文件中bindings.后面的值 相同
	@Output(HK_UP_EXCHANGE)
	MessageChannel outputHkUp();

	@Output(HK_DOWN_EXCHANGE)
	MessageChannel outputHkDown();

}
public interface HkQueue {
	String HK_UP_QUEUE = "hkUpcallbackQueue";

	String HK_DOWN_QUEUE = "hkDowncallbackQueue";

	@Input(HK_UP_QUEUE)
	SubscribableChannel inputHkUp();

	@Input(HK_DOWN_QUEUE)
	SubscribableChannel inputHkDown();

}

@Conditional(MqConditional.class)
@EnableBinding({HkExchange.class, HkQueue.class})
public class StreamMqConfig {
	@Bean
	public HkMqConsumer hkMqConsumer(){
		return new HkMqConsumer();
	}
}

StreamMqConfig 添加了@Conditional(MqConditional.class)为了避免在开发环境中也开启mq,如果开发环境开启mq的话每次调试信息发送总会发到其他开发的小伙伴机器上去。

public class MqConditional implements Condition {
	@Override
	public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
		String[] activeProfiles = conditionContext.getEnvironment().getActiveProfiles();
		if ("dev".equals(activeProfiles[0])) return false;
		return true;
	}
}

消息发送和接收

@Slf4j
//消费者
public class HkMqConsumer {
	@StreamListener(HkQueue.HK_UP_QUEUE)
	public void processUp(JSONObject message, @Header(AmqpHeaders.CHANNEL) Channel channel,
						  @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
		System.out.println("received Up message : " + message);
		try {
			channel.basicAck(deliveryTag, false);
		} catch (IOException e) {
			e.printStackTrace();

		}
	}
	@StreamListener(HkQueue.HK_DOWN_QUEUE)
	public void processDown(JSONObject message,@Header(AmqpHeaders.CHANNEL) Channel channel,
							@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
		System.out.println("received down message : " + message);
		try {
			channel.basicAck(deliveryTag, false);
		} catch (IOException e) {
			e.printStackTrace();

		}
	}
}

@Component
@Data
public class MqUtil {
	@Lazy
	@Autowired
	private MessageChannel hkUpcallbackExchange;
	@Lazy
	@Autowired
	private MessageChannel hkDowncallbackExchange;
	public R sendMessage(String exchangeName,JSONObject json) {
		try {
			Class<? extends MqUtil> aClass = this.getClass();
			Field field = aClass.getDeclaredField(exchangeName);
			field.setAccessible(true);
			Object o = field.get(this);
			MessageChannel channel = (MessageChannel)o;
			MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json);
			if (channel.send(messageBuilder.build())){
				return R.success("mq发送成功");
			}else {
				return R.success("mq发送失败");
			}
		} catch (NoSuchFieldException | IllegalAccessException e) {
			e.printStackTrace();
			return R.fail("反射异常,找不到对应属性");
		} catch (NoUniqueBeanDefinitionException en) {
			en.printStackTrace();
			return R.fail("dev不配置mq 请使用test环境");
		} catch (Exception ex){
			ex.printStackTrace();
			return R.fail("其他异常:"+ex.getMessage());
		}

	}
}

封装了mq的发送 mqUtil.sendMessage(HkExchange.HK_UP_EXCHANGE, genJson); 后续发送只需要调api就好,避免重复写发送代码

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐