首先:redis有一个独立的模块来支持消息多播模式,即Pub/Sub。我们先通过redis-cli来模拟一下Pub/Sub功能。先订阅一个消息,

SUBSCRIBE topic-01

然后客户端向channel:topic-01发消息

 可以看到订阅的收到了消息。

我们现在使用Spring Boot 整合Redis,实现消息的订阅,可以实现异步发送邮寄,发送手机验证码,或者异步调用等。

pom.xml

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
RedisChannelListener:实现消息的监听
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class RedisChannelListener implements MessageListener {


    private static Map<String, Consumer<String>> RULE = new HashMap<>();

    {
        RULE.put(TopicConfig.SEND_EMAIL_TOPIC, this::sendEmail);
        RULE.put(TopicConfig.SEND_PHONE_TOPIC, this::sendPhone);
        RULE.put(TopicConfig.ASYNC_CALL_TOPIC, this::async);
    }



    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] b_channel = message.getChannel();
        byte[] b_body = message.getBody();
        try {
            String channel = new String(b_channel);
            String body = new String(b_body);
            System.out.println("channel is:" + channel + " , body is: " + body);
            RULE.get(channel).accept(body);
        } catch (Exception e) {
        }
    }

    public void async(String s) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("AsyncService exec params is :" + s);
    }

    public void sendEmail(String s) {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("EmailService exec params is :" + s);

    }

    public void sendPhone(String s) {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("PhoneService exec params is :" + s);
    }


}

消息监听的注册

import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;


@Component
public class RedisConfig {


    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(new RedisChannelListener());
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.SEND_EMAIL_TOPIC));
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.SEND_PHONE_TOPIC));
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.ASYNC_CALL_TOPIC));
        return container;
    }


}

定义的一些Topic

public class TopicConfig {

    public final static String SEND_EMAIL_TOPIC = "send_email_topic";
    public final static String SEND_PHONE_TOPIC = "send_phone_topic";
    public final static String ASYNC_CALL_TOPIC = "async_call_topic";
}

写一个单元测试类

import com.hash.me.hashmeredis.config.TopicConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;

@SpringBootTest
public class MessageTest {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Test
    void pub() {
        redisTemplate.convertAndSend(TopicConfig.SEND_EMAIL_TOPIC, "send email");
        redisTemplate.convertAndSend(TopicConfig.SEND_PHONE_TOPIC, "send phone code");
        redisTemplate.convertAndSend(TopicConfig.ASYNC_CALL_TOPIC, "async call");
    }
    
}

运行一下测试用例,可以看到如下结果,可以看到消息已经被接收消费掉了

        最后Pub/Sub的生产者传递过来一个消息, Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被直接丢弃。如果开始有三个消费者, 有一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重新连上的时候,在断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了如果Redis 停机重启, PubSub 的消息是不会持久化的,毕竟Redis宕机就相当于一个消费者都没有,所有的消息会被直接丢弃。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐