RocketMQListener监听器的使用思考
前言刚开始用Spring-rocketmq时,好奇怎么只需要一个实现了RocketMQListener接口的自定义监听器就可以消费消息了:这个自定义的消息监听器里,并没有如rocketmq原生Consumer那样对消息进行ack或者nack,那他到底是怎么控制的?为此专门看了下源码,全局搜了@RocketMQMessageListener注解,搜到一些东西,分享下:源码首先ListenerCont
前言
刚开始用Spring-rocketmq时,好奇怎么只需要一个实现了RocketMQListener接口的自定义监听器就可以消费消息了:
这个自定义的消息监听器里,并没有如rocketmq原生Consumer那样对消息进行ack或者nack,那他到底是怎么控制的?为此专门看了下源码,全局搜了@RocketMQMessageListener注解,搜到一些东西,分享下:
源码
核心类:ListenerContainerConfiguration/DefaultRocketMQListenerContainer
首先ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法:
上面从Spring上下文中拿到了所有加了@RocketMQMessageListener的bean,咱们自定义的这个CommonMessageListener也就拿到了,接着执行registerContainer方法,其中的关键代码:
上面通过createRocketMQListenerContainer方法创建了一个DefaultRocketMQListenerContainer的BeanDefinition,交给Spring容器管理,annotation即是@RocketMQMessageListener注解参数的实例,
内部两个关键的地方:
1.DefaultRocketMQListenerContainer#setRocketMQMessageListener方法将注解中的参数值进行赋值:
2.DefaultRocketMQListenerContainer#setRocketMQListener或者DefaultRocketMQListenerContainer#setRocketMQReplyListener方法设置当前的这个CommonMessageListener设置进去:
DefaultRocketMQListenerContainer实现了InitializingBean接口,所以,最终当其作为一个bean实例化出来之后,就会调用初始化方法:
initRocketMQPushConsumer方法是关键,看名字就知道初始化了rocketmq原生的Consumer并进行队列监听,监听之前先设置MessageListener对象:
这里已经是原生的rocketmq的Consumer对象了,所以消息会到这个MessageListener中,以DefaultMessageListenerOrderly源码为例:
handleMessage方法里就会调用我们自定义的CommonMessageListener#onMessage方法:
结论
spring rocketmq底层通过DefaultRocketMQListenerContainer这个类封装原生的Consumer对象来消费消息,其内部的DefaultMessageListenerConcurrently和DefaultMessageListenerOrderly两个消息监听器会根据我们的业务是否抛出异常来决定消息是否ack!
资源分享
官方源码地址:
https://github.com/apache/rocketmq-spring.git
另外关于消息中间件,分享本人写的几个演示demo,包括了各种官方接口的使用姿势,共同学习!
更多推荐
所有评论(0)