Spring Boot 整合Redis实现Pub/Sub
首先:redis有一个独立的模块来支持消息多播模式,即Pub/Sub。我们先通过redis-cli来模拟一下Pub/Sub功能。先订阅一个消息,SUBSCRIBE topic-01然后客户端向channel:topic-01发消息可以看到订阅的收到了消息。我们现在使用Spring Boot 整合Redis,实现消息的订阅,可以实现异步发送邮寄,发送手机验证码,或者异步调用等。pom.xml<
·
首先:redis有一个独立的模块来支持消息多播模式,即Pub/Sub。我们先通过redis-cli来模拟一下Pub/Sub功能。先订阅一个消息,
SUBSCRIBE topic-01
然后客户端向channel:topic-01发消息
可以看到订阅的收到了消息。
我们现在使用Spring Boot 整合Redis,实现消息的订阅,可以实现异步发送邮寄,发送手机验证码,或者异步调用等。
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
RedisChannelListener:实现消息的监听
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class RedisChannelListener implements MessageListener {
private static Map<String, Consumer<String>> RULE = new HashMap<>();
{
RULE.put(TopicConfig.SEND_EMAIL_TOPIC, this::sendEmail);
RULE.put(TopicConfig.SEND_PHONE_TOPIC, this::sendPhone);
RULE.put(TopicConfig.ASYNC_CALL_TOPIC, this::async);
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] b_channel = message.getChannel();
byte[] b_body = message.getBody();
try {
String channel = new String(b_channel);
String body = new String(b_body);
System.out.println("channel is:" + channel + " , body is: " + body);
RULE.get(channel).accept(body);
} catch (Exception e) {
}
}
public void async(String s) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AsyncService exec params is :" + s);
}
public void sendEmail(String s) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("EmailService exec params is :" + s);
}
public void sendPhone(String s) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("PhoneService exec params is :" + s);
}
}
消息监听的注册
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
@Component
public class RedisConfig {
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(new RedisChannelListener());
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.SEND_EMAIL_TOPIC));
container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.SEND_PHONE_TOPIC));
container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicConfig.ASYNC_CALL_TOPIC));
return container;
}
}
定义的一些Topic
public class TopicConfig {
public final static String SEND_EMAIL_TOPIC = "send_email_topic";
public final static String SEND_PHONE_TOPIC = "send_phone_topic";
public final static String ASYNC_CALL_TOPIC = "async_call_topic";
}
写一个单元测试类
import com.hash.me.hashmeredis.config.TopicConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
public class MessageTest {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Test
void pub() {
redisTemplate.convertAndSend(TopicConfig.SEND_EMAIL_TOPIC, "send email");
redisTemplate.convertAndSend(TopicConfig.SEND_PHONE_TOPIC, "send phone code");
redisTemplate.convertAndSend(TopicConfig.ASYNC_CALL_TOPIC, "async call");
}
}
运行一下测试用例,可以看到如下结果,可以看到消息已经被接收消费掉了
最后Pub/Sub的生产者传递过来一个消息, Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被直接丢弃。如果开始有三个消费者, 有一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重新连上的时候,在断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了如果Redis 停机重启, PubSub 的消息是不会持久化的,毕竟Redis宕机就相当于一个消费者都没有,所有的消息会被直接丢弃。
更多推荐
已为社区贡献1条内容
所有评论(0)