目录

前言

一、了解下发布订阅模式

二、通过redis-cli命令subscribe/publish体验下发布订阅

1.多个客户端订阅同一个channel

2.一个客户端订阅多个频道

三、在springboot中使用发布订阅

四、springboot中使用发布订阅的另一种写法


前言

这篇记录一下redis的发布订阅模式的简单使用,各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!

一、了解下发布订阅模式

借用菜鸟教程上的描述,Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

同样的,一个客户端也可以订阅多个频道。

二、通过redis-cli命令subscribe/publish体验下发布订阅

1.多个客户端订阅同一个channel

如上,打开三个命令窗口,其中两个窗口中执行命令subscribe channel1 订阅同一个频道channel1,另一个窗口中通过publish命令想channel1中发布消息aaa,可以看到另两个窗口中都接受到了消息。

2.一个客户端订阅多个频道

如上,打开三个窗口,在一个窗口中执行命令subscribe channel1 channel2 同时订阅两个频道,在另外两个窗口中分别通过publish命令向channel1和channel2中发送消息,可以看到窗口1中接受到了订阅频道中的aaa,bbb消息。

3.发布订阅相关的其它命令

1.psubscribe pattern [pattern ...]---可以订阅一个或多个符合给定模式的频道

2.punsubscribe channel [channel ...]---退订指定的频道

3.punsubscribe pattern [pattern ...]---退订所有给定模式的频道

三、在springboot中使用发布订阅

先说下在springboot中使用redis的发布订阅的步骤:

  1. 配置消息监听类(实现MessageListener接口,重写onMessage方法)。
  2. 添加监听容器(配置RedisMessageListenerContainer的bean)。
  3. 订阅频道。
  4. 向频道发布消息。

首先,贴一下消息监听类的代码:

@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取消息
        byte[] messageBody = message.getBody();
        // 使用值序列化器转换
        Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
        // 获取监听的频道
        byte[] channelByte = message.getChannel();
        // 使用字符串序列化器转换
        Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
        // 渠道名称转换
        String patternStr = new String(pattern);
        System.out.println(patternStr);
        System.out.println("---频道---: " + channel);
        System.out.println("---消息内容---: " + msg);
    }
}

然后,在RedisConfig中配置一些监听容器并订阅频道,代码如下:

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   RedisMessageListener listener
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        //订阅频道redis.news 和 redis.life  这个container 可以添加多个 messageListener
        container.addMessageListener(listener, new ChannelTopic("redis.life"));
        container.addMessageListener(listener, new ChannelTopic("redis.news"));
        return container;
    }

测试一下,使用redisTemplate的convertAndSend方法向指定频道发布消息,代码如下:

    @Test
    public void publish() {
        // 使用convertAndSend方法向频道redisChat发布消息
        redisTemplate.convertAndSend("redis.life", "aaa");
        redisTemplate.convertAndSend("redis.news", "bbb");
    }

执行测试方法publish的结果如下:

在配置监听容器添加订阅频道时,除了使用ChannelTopic外,还可以使用通配符的形式订阅一类频道,如可以通过订阅redis.*同时订阅redis.life和redis.news,代码如下:

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   RedisMessageListener listener
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(listener,  patternTopic());
        return container;
    }

    /**
     * 订阅匹配的多个频道
     */
    @Bean
    public PatternTopic patternTopic() {
        return new PatternTopic("redis.*");
    }

四、springboot中使用发布订阅的另一种写法

除了上面那种配置监听容器的方式之外,还可以使用MessageListenerAdapter作为参数,然后再自定义个一个消息接收类(MessageReceiver)的方式实现发布订阅。

首先,定义 一个消息接受类,代码如下:

@Component
public class MessageReceiver {
    public void receiveMessage(String message,String channel){
        System.out.println("---频道---: " + channel);
        System.out.println("---消息内容---: " + message);
    }
}

然后,配置一个MessageListenerAdapter 

    /**
     * 这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
     * 也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

最后,修改下监听容器,RedisMessageListener替换为MessageListenerAdapter,代码如下:

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   MessageListenerAdapter adapter
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 一次订阅多个匹配的频道
        container.addMessageListener(adapter, patternTopic());
        return container;
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐