《SpringBoot2.0 实战》系列-整合Activemq实现点对点、发布订阅模式共存、消息持久化、重试等
简介Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。作用应用解耦异步通信流量削峰消息通讯安装提供docker的安装方式,其他方式网上也都有很多https://blog.csdn.net/HXNLYW/article/de...
·
简介
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
作用
- 应用解耦
- 异步通信
- 流量削峰
- 消息通讯
安装
提供docker的安装方式,其他方式网上也都有很多
与springboot整合
jar依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--如果参数配置pool为true的话,需要引入下列依赖包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
配置文件
spring:
# activemq
activemq:
broker-url: tcp://xxx:61616
user: admin
password: admin
pool:
enabled: true
max-connections: 100
packages:
#注意 对象传输需开启包白名单 否则会报错- -
trust-all: true
in-memory: true
# 自定义queue名称
queue-name: gourd-queue
# 自定义topic名称
topic-name: gourd-topic
jms:
# 开启发布订阅模式
pub-sub-domain: true
配置类
配置类加上@EnableJms注解,注册重试、持久化配置bean
/**
* active消息队列配置
* @author gour.hu
*/
@Configuration
@EnableJms
public class ActiveMqConfig {
/**
* 定义存放消息的队列
* @return
*/
@Bean
public Queue queue(@Value("${spring.activemq.queue-name}")String queueName) {
return new ActiveMQQueue(queueName);
}
/**
* 发布订阅模式
* @return
*/
@Bean
public Topic topic(@Value("${spring.activemq.topic-name}")String topicName) {
return new ActiveMQTopic(topicName);
}
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
// 启用指数倍数递增的方式增加延迟时间。
redeliveryPolicy.setUseExponentialBackOff(true);
// 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。默认5
redeliveryPolicy.setBackOffMultiplier(2);
// 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认6
redeliveryPolicy.setMaximumRedeliveries(6);
// 重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(5000L);
// 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。
redeliveryPolicy.setUseCollisionAvoidance(true);
// 设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url,
@Value("${spring.activemq.user}")String user, @Value("${spring.activemq.password}")String password,
RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(user,password,url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate getJmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,
MessageConverter jacksonJmsMessageConverter,@Value("${spring.jms.pub-sub-domain}")Boolean subAdmin){
//使用CachingConnectionFactory可以提高部分性能。
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setSessionCacheSize(100);
cachingConnectionFactory.setTargetConnectionFactory(activeMQConnectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置deliveryMode(持久化)
jmsTemplate.setExplicitQosEnabled(true);
// 设置消息是否持久化
jmsTemplate.setDeliveryPersistent(true);
// 设置消息转换器
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
// 设置消息是否以事务
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setPubSubDomain(subAdmin);
return jmsTemplate;
}
/**
* topic模式的ListenerContainer
*
* @param activeMQConnectionFactory
* @return
*/
@Bean("jmsListenerContainerTopic")
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory activeMQConnectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer,@Value("${spring.application.name}")String applicationName) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, activeMQConnectionFactory);
// 设置消息转换器
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setAutoStartup(true);
//开启持久化订阅
factory.setSubscriptionDurable(true);
factory.setClientId(applicationName);
return factory;
}
/**
* queue模式的ListenerContainer
* @param activeMQConnectionFactory
* @return
*/
@Bean("jmsListenerContainerQueue")
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory activeMQConnectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, activeMQConnectionFactory);
// 设置消息转换器
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setPubSubDomain(false);
return factory;
}
/**
* 消息转换器
* @return
*/
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
提供者
/**
* 队列消息控制器
* @author gourd.hu
*/
@RestController
@RequestMapping("/activeMq")
@Api(tags = "activeMq", description = "队列消息控制器" )
public class ActiveMqController {
/**
* 注入存放消息的队列,用于下列方法一
*/
@Autowired
private Queue queue;
/**
* 注入springboot封装的工具类
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@GetMapping("send")
@ApiOperation(value = "发送消息到队列")
public BaseResponse send(@RequestParam("msg") String msg) {
jmsMessagingTemplate.convertAndSend(queue, msg);
return BaseResponse.ok("success!");
}
@Autowired
private Topic topic;
@GetMapping("/topic")
@ApiOperation(value = "发送消息到主题")
public BaseResponse handlerActiveMq(@RequestParam("msg")String msg) {
jmsMessagingTemplate.convertAndSend(topic, msg);
return BaseResponse.ok("success!");
}
}
消费者
@SendTo 加上此注解消费方法可以有返回值,否则只能为void
/**
* 消息消费
*
* @author gourdhu
*/
@Component
@Slf4j
public class ConsumerService {
/**
* 使用JmsListener配置消费者监听的队列,其中name是接收到的消息
* @param message
* @return
*/
@JmsListener(destination = "gourd-queue",containerFactory = "jmsListenerContainerQueue")
@SendTo("SQueue")
public void handleMessage(final TextMessage message) throws JMSException {
log.info("queue-consumer收到的报文为:" + message.getText());
}
@JmsListener(destination = "gourd-topic",containerFactory = "jmsListenerContainerTopic")
@SendTo("STopic")// 加上此注解可以有返回值,否则只能为void
public void receiveTopic(final TextMessage message) throws JMSException {
log.info("topic-consumer收到的报文为:" + message.getText());
// if(true){
// throw new BusinessException("测试消息重试");
// }
}
}
至此整合完成,经测试持久化和重试都是正常的。
===============================================
代码均已上传至本人的开源项目
spring-cloud-plus:https://blog.csdn.net/HXNLYW/article/details/104635673
更多推荐
已为社区贡献3条内容
所有评论(0)