spring boot操作activemq队列及主题示例
工作过程:1. 搭建消息代理服务器在linux虚拟机上安装activemq,2. 新建spring boot项目,作为producer(1)配置JMS连接工厂ConnectionFactory,让它知道如何连接到ActiveMQ。ActiveMQConnection是ActiveMQ自带的连接工厂。@Beanpublic ConnectionFactory connectionFactory().
工作过程:
1. 搭建消息代理服务器
在linux虚拟机上安装activemq,
2. 新建spring boot项目,作为producer
(1)配置JMS连接工厂ConnectionFactory,让它知道如何连接到ActiveMQ。ActiveMQConnection是ActiveMQ自带的连接工厂。
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.211.106:61616");
return factory;
}
2)配置需要消息传递的目的地。目的地可以是一个队列queue,也可以是一个主题topic。
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
(3)使用JmsTemplate编写一个定时发送程序,配置JmsTemplate,并设置MessageConverter
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
一个jmsTemplate可以同时支持queue和topic的消息发送,只需要指定不同的destination即可
(4)调用jmsTemplate.convertAndSend来发送Email对象,这里使用了@Schedule循环发送Email给queue和topic
3. 新建springboot项目,作为consumer
创建消息驱动的POJO接收消息
(1)仍要配置JMS连接工厂ConnectionFactory
(2)配置Topic模式消息监听器。对Topic模式配置JmsListenreContainerFactory,设置好ConnectionFactory及MessageConverter,并且还要调用setPubSubDomain
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
factory.setMessageConverter(jacksonJmsMessageConverter());
// You could still override some of Boot's default if necessary.
return factory;
}
(3)配置Queue模式消息监听器。对Queue模式配置JmsListenreContainerFactory,设置好ConnectionFactory及MessageConverter
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jacksonJmsMessageConverter());
// You could still override some of Boot's default if necessary.
return factory;
}
(4)在方法上使用注解@JmsListener来接收消息,其需要配置监听的目的地及listenerContainer
Activemq的效果:
Queue模式:
Topic模式:
结果:
应该先启动两个consumer之后,再启动producer
生产者:
消费者1:
消费者2:
源代码
生产者源码:https://gitee.com/constfafa/activemq-producer
消费者1源码:https://gitee.com/constfafa/activemq-consumer
消费者2源码:https://gitee.com/constfafa/activemq-consumer2
参考
更多推荐
所有评论(0)