前言

理论部分请前往这个专题的其他文章查看,本文接RocketMQ消息类型文章,本文使用rocketmq-client操作RocketMQ发送接受各种消息的案例!

依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>
		
		<!--或者-->
		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

这里不过多解释上文中有介绍!

普通消息

同步消息

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

import java.util.concurrent.TimeUnit;

/**
 * @description: 发送同步消息
 * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
 * @author TAO
 * @date 2021/1/14 20:52
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("first-topic", null, ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

异步消息

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

import java.util.concurrent.TimeUnit;

/**
 * @description: 发送异步消息
 * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
 * @author TAO
 * @date 2021/1/14 21:39
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("first-topic", null, ("Hello World" + i).getBytes());
            //5.发送异步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 发送成功回调函数
                 * @param sendResult
                 */
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:" + sendResult);
                }

                /**
                 * 发送失败回调函数
                 * @param e
                 */
                public void onException(Throwable e) {
                    System.out.println("发送异常:" + e);
                }
            });
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

单向消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * @description: 单向发送消息
 * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
 * @author TAO
 * @date 2021/1/14 21:40
 */
public class OneWayProducer {

    public static void main(String[] args) throws Exception, MQBrokerException {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //集群模式
        //producer.setNamesrvAddr("192.168.0.188:9876;192.168.0.177:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("first-topic", null, ("Hello World,单向消息" + i).getBytes());
            //5.发送单向消息
            producer.sendOneway(msg);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(5);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消费者
这里消费者都可以共用一个


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
/**
 * @description: 普通消息消费者(消费同步、异步、单向)
 * @author TAO
 * @date 2021/1/14 20:51
 */
public class OrdinaryConsumer {

    public static void main(String[] args)  throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("first-topic", "*");
        //consumer.subscribe("base", "Tag1");

        //消费所有"*",消费Tag1和Tag2  Tag1 || Tag2
        //consumer.subscribe("base", "*");

        //设定消费模式:负载均衡|广播模式  默认为负载均衡
        //负载均衡10条消息,每个消费者共计消费10条
        //广播模式10条消息,每个消费者都消费10条
        //consumer.setMessageModel(MessageModel.BROADCASTING);

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    //System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

默认消费模式时负载均衡,并不是广播模式,如果需要开启广播模式需要设置如下代码即可

consumer.setMessageModel(MessageModel.BROADCASTING);

对比
在这里插入图片描述

注意:
这里设置了consumerGroup这个概念,说明一下,不同的consumerGroup名称属于不同的消费者集群,如果不同的消费者集群订阅了同一个topic,那么两个消费者集群都会收到消息,至于说各消费者集群中的消费者是全部收到消息,还是轮训收到消息是由消费端订阅方式setMessageModel决定的!

效果演示
我们使用AsyncProducer作为消息生产端,发送10条topic为test的消息!开启两个消费组group-a、和group-b,group-a采用轮询方式订阅topic为test,group-b采用广播模式订阅topic为test,group-a下有三个消费者,group-b下也有三个消费者,消息接收情况如下!

group-a:
消费者1
在这里插入图片描述

消费者2
在这里插入图片描述

消费者3
在这里插入图片描述

group-b
消费者1
在这里插入图片描述

消费者2
在这里插入图片描述

消费者3
在这里插入图片描述
生产端:
在这里插入图片描述

批量消息

生产者


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * @description: 批量消息
 * 如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
 * @author TAO
 * @date 2021/1/15 12:25
 */
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();
        List<Message> msgs = new ArrayList<Message>();
        //4.创建消息对象,指定主题Topic、Tag和消息体
        /**
         * 参数一:消息主题Topic
         * 参数二:消息Tag
         * 参数三:消息内容
         */
        Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
        Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
        Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());

        msgs.add(msg1);
        msgs.add(msg2);
        msgs.add(msg3);
        //5.发送消息
        SendResult result = producer.send(msgs);
        //发送状态
        SendStatus status = result.getSendStatus();
        System.out.println("发送结果:" + result);
        //线程睡1秒
        TimeUnit.SECONDS.sleep(1);
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description: 批量消息消费者
 * @author TAO
 * @date 2021/12/9 6:48 下午
 */
public class BatchConsumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("BatchTopic", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}

注意

  • 批量消息发送的时候topic需要保持一致,否则无法发送消息
  • 批量发送消息能显著提高传递小消息的性能
  • 批量消息不能是延时消息
  • 批量消息必须相同的waitStoreMsgOK
  • 批量消息消息的总大小不应超过4MB。rocketmq建议每次批量消息大小大概在1MB。当消息大小超过4MB时,需要将消息进行分割

消息分割

import org.apache.rocketmq.common.message.Message;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @description: 消息切割
 * @author TAO
 * @date 2021/1/15 12:31
 */
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的开销20字节
            if (tmpSize > SIZE_LIMIT) {
                //单个消息超过了最大的限制
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

生产者测试


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

/**
 * @description: 发送大于4MB的消息
 * @author TAO
 * @date 2021/12/9 10:45 下午
 */

public class BigMsgProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        List<Message> messages = new ArrayList<Message>();

        int i = 0;
        while (i<10000){
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            String context = "你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳" +
                    "你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳,你好北京,你好上海,你好深圳";
            Message msg = new Message("BatchTopic", "Tag1", (context+ 1).getBytes());
            messages.add(msg);
            i++;
        }
        System.out.println("此次发送批量消息条数==>"+messages.size());
        int frequency = 0;
        //把大的消息分裂成若干个小的消息
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            ++frequency;
            System.out.println("第"+frequency+"次");
            try {
                List<Message>  listItem = splitter.next();
                //5.发送消息
                SendResult result =producer.send(listItem);
                //发送状态
                SendStatus status = result.getSendStatus();
                //System.out.println("发送结果:" + result);
            } catch (Exception e) {
                e.printStackTrace();
                //处理error
            }
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

这里构建的messages消息整体是大于4MB的所以这里消息会被分割

延迟消息

生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

/**
 * @description: 延迟消息
 * @author TAO
 * @date 2021/1/15 10:57
 */
public class DelayProducer {

    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());

            //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            //现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
            //设定延迟时间
            msg.setDelayTimeLevel(2);
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description: 延迟消息
 * @author TAO
 * @date 2021/1/15 12:13
 */
public class DelayConsumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("DelayTopic", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}

注意
延迟消息其实可以定义为两种模式,一种是定时发送和延迟发送,其地城逻辑是不一样的,
定时发送大致如下

// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2021-12-10 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-08-01 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);

延迟发送如下

// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递
long delayTime = System.currentTimeMillis() + 3000;
// 设置消息需要被投递的时间
msg.setStartDeliverTime(delayTime);

但是我们使用的开源吧RocketMQ并不支持这种自定义时间,只支持内部提供好的策略如下

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

我们设置的时候只能使用1-18对应上面的时间完成延迟效果,但是在收费版的ONS中是可以的没这个会在ONS整合中演示!

顺序消息

这里为什么存在顺序消息这么回事呢,这就要知道RocketMQ的内部构造了,这个单独拎出一片文章来介绍如下RocketMQ顺序消息

事务消息

demo代码如下,后期会单独将这部分拎出来分析下源代码,看看底层是如何实现的
生产者

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.TimeUnit;

/**
 * @description: 事务消息
 * @author TAO
 * @date 2021/1/15 14:00
 */
public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        //1.创建事务消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("producer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876");

        //添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                if (StringUtils.equals("TAGA", msg.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", msg.getTags())) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TAGC", msg.getTags())) {
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }
            /**
             * 该方法时MQ进行消息事务状态回查
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("消息的Tag:" + msg.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        //3.启动producer
        producer.start();

        String[] tags = {"TAGA", "TAGB", "TAGC"};

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.sendMessageInTransaction(msg, null);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }
        //6.关闭生产者producer
        //producer.shutdown();
    }
}

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description: 事务消息
 * @author TAO
 * @date 2021/1/15 14:01
 */
public class TransactionConsumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("TransactionTopic", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("生产者启动");
    }
}

消息过滤

这个其实就比较简单了,就是针对Tag进行消息筛选过滤的,共有两种形式,一种是基于TAG的另一种是基于SQL的

TAG消息过滤
生产者


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * @description: tag
 * @author TAO
 * @date 2021/1/15 12:33
 */
public class TagProducer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876;xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消费者


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TagConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876;xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}

SQL
生产者


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * @description: sql
 * @author TAO
 * @date 2021/1/15 12:56
 */
public class SqlProducer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx.x:9876;xxx.xxx.xxx.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());

            msg.putUserProperty("i", String.valueOf(i));
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消费者


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @description: sql
 * @author TAO
 * @date 2021/1/15 12:56
 *
 */
public class SqlConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("xxx.xxx.xxx.x:9876;xxx.xxx.xxx.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
        //TODO 使用SQL过滤需要broker开启开启对filter的支持enablePropertyFilter=true

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐