rabbitMq每条消息都要消费两次以上,并且报org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
rabbitMq每条消息都要消费两次以上,并且报org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
通过日志查看,每条消息都会消费两次以上,并且报Caused by: org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed 错误。最开始发现存在消息重复消费现象,以为是网络等问题引起的重复消费问题,通过redis锁来控制重复消费问题,具体加锁代码如下:
Boolean lockSuccess = redisUtils.tryLock(message.getCode(), MsgEnum.YBM.name(), 5 * 60 * 1000);
加锁成功就,执行后续代码,加锁失败直接返回ack;
异常信息处理:
//消费失败redis锁释放释放 redisUtils.releaseLock(message.getMessageProperties().getMessageId(), MsgEnum.YBM.name());
这个代码加上后可以保证不会重复消费,但是通过日志发现,基本每条消息都会有两次以上消费,加锁失败的情况,说明问题根源并没有解决。后经过大牛指导,消费消息返回ack默认是需要一秒内回复的,不然会重新发送,并且消费的线程数量也适当的增加了一些,相关配置如下:
@Bean
public ThreadPoolTaskExecutor threadPoolForDongzhou() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(60);
poolTaskExecutor.setMaxPoolSize(100);
poolTaskExecutor.setQueueCapacity(1024);
poolTaskExecutor.setKeepAliveSeconds(300);
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return poolTaskExecutor;
}
listener:
simple:
acknowledge-mode: manual
retry:
#60秒后重试
initial-interval: 60000
#启用发布重试
enabled: true
#传递消息的最大尝试次数
max-attempts: 3
#尝试的最大时间间隔
max-interval: 60000
#应用于先前传递重试时间间隔的乘数
multiplier: 1.0
initial-interval 设置为60秒这样就有足够的时间来处理代码逻辑.
更多推荐
所有评论(0)