RocketMQ,RocketMQTemplate,putUserProperty,发送带属性的消息
消费者需要利用sql92过滤消息,首先修改broker配置文件;
添加
enablePropertyFilter=true 配置;
重新启动broker;启动命令
nohup sh mqbroker -n localhost:9876 -c …/conf/broker.conf &

生产者代码编辑:

 for (int i=0;i<100;i++){
            Map<String, Object> headers = new HashMap<>();
            headers.put("KEYS","hello");
            headers.put("age",i);
            User user = new User();
            user.setAge(i);
            template.convertAndSend("testTopic:b",user,headers);
        }

KEYS传入消息的key,有需要的同学 可以传。

age 为自定义属性,用于消费者利用sql92过滤使用。
消费者代码示例:

@Slf4j
@Component
@RocketMQMessageListener(topic = "testTopic",consumerGroup = "myGroup2",consumeThreadMax = 1,selectorType= SelectorType.SQL92,selectorExpression = "age>30")
public class Consumer implements RocketMQListener<User> {
    @Override
    public void onMessage(User user) {
        log.info("myGroup2 Receive message:"+ JSON.toJSONString(user));
    }
}

原创不易,有帮助的话麻烦点个赞吧,感谢!!!!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐