通过日志查看,每条消息都会消费两次以上,并且报Caused by: org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed 错误。最开始发现存在消息重复消费现象,以为是网络等问题引起的重复消费问题,通过redis锁来控制重复消费问题,具体加锁代码如下:

Boolean lockSuccess = redisUtils.tryLock(message.getCode(), MsgEnum.YBM.name(), 5 * 60 * 1000);

加锁成功就,执行后续代码,加锁失败直接返回ack;

异常信息处理:

//消费失败redis锁释放释放 redisUtils.releaseLock(message.getMessageProperties().getMessageId(), MsgEnum.YBM.name());

这个代码加上后可以保证不会重复消费,但是通过日志发现,基本每条消息都会有两次以上消费,加锁失败的情况,说明问题根源并没有解决。后经过大牛指导,消费消息返回ack默认是需要一秒内回复的,不然会重新发送,并且消费的线程数量也适当的增加了一些,相关配置如下:

@Bean

public ThreadPoolTaskExecutor threadPoolForDongzhou() {

ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();

poolTaskExecutor.setCorePoolSize(60);

poolTaskExecutor.setMaxPoolSize(100);

poolTaskExecutor.setQueueCapacity(1024);

poolTaskExecutor.setKeepAliveSeconds(300);

poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

return poolTaskExecutor;

}

 listener:
      simple:
        acknowledge-mode: manual
        retry:
          #60秒后重试
          initial-interval: 60000 
          #启用发布重试
          enabled: true
          #传递消息的最大尝试次数          
          max-attempts: 3 
          #尝试的最大时间间隔
          max-interval: 60000 
          #应用于先前传递重试时间间隔的乘数
          multiplier: 1.0

initial-interval 设置为60秒这样就有足够的时间来处理代码逻辑.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐