redis集群模式下key过期事件监听
项目组准备将自建的redis切到公司云平台的redis服务;自建的redis用的是哨兵模式,而云平台的提供的redis服务用的是集群模式。切换前先分析redis用到了那些功能,redis集群模式下是否兼容。分析代码时发现,用到了RedisMessageListenerContainer,该类用于监听redis发出的消息(redis的发布订阅功能)。我们使用到了redis键过期通知的特性,来实现超时
·
背景
项目组准备将自建的redis切到公司云平台的redis服务;自建的redis用的是哨兵模式,而云平台的提供的redis服务用的是集群模式。
切换前先分析redis用到了那些功能,redis集群模式下是否兼容。分析代码时发现,用到了RedisMessageListenerContainer,该类用于监听redis发出的消息(redis的发布订阅功能)。我们使用到了redis键过期通知的特性,来实现超时处理异步任务的。
观察RedisMessageListenerContainer,发现其入参为RedisConnectionFactory,并不是集群信息;怀疑RedisMessageListenerContainer会不会只会随机连一个节点来监听消息,另外redis键过期事件不会广播,最终导致该功能失效。
经多方查证,结论如下:
- redis集群模式下,消息发布(Pub/Sub)会进行广播
- redis键过期事件也是走的消息通道,不过为了防止消息量过大,不会进行广播。详细见【redis官方文档翻译系列】-Redis keyspace notifications
- jedis确实只会随机连一个节点,来监听消息。不过这也符合(Pub/Sub)功能的语义。
- redis的从节点的键过期,靠主节点触发键过期后发送delete指令,从节点本身不会产生键过期事件。
解决方案
基于RedisMessageListenerContainer实现对所有节点的监听。
@Slf4j
public class RedisClusterMessageListenerContainer
implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
private String beanName;
private RedisClusterConfig clusterConfig;
private List<MessageListener> listeners;
private ThreadPoolTaskExecutor executor;
private Topic topic;
private volatile boolean running = false;
private List<RedisMessageListenerContainer> containers = new ArrayList<>(6);
public RedisClusterMessageListenerContainer(RedisClusterConfig clusterConfig, List<MessageListener> listeners,
ThreadPoolTaskExecutor executor, Topic topic) {
this.clusterConfig = clusterConfig;
this.listeners = listeners;
this.executor = executor;
this.topic = topic;
initContainers();
}
@Override
public void setBeanName(String name) {
this.beanName = name;
}
@Override
public void destroy() throws Exception {
containers.forEach(item -> {
try {
item.destroy();
} catch (Throwable e) {
log.warn("exception happen when destroy RedisMessageListenerContainer", e);
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
containers.forEach(item -> item.afterPropertiesSet());
}
@Override
public void start() {
containers.forEach(item -> item.start());
running = true;
}
@Override
public void stop() {
containers.forEach(item -> item.stop());
running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
private void initContainers() {
Stream.of(clusterConfig.getNodes().split(",")).map(node -> {
String[] items = node.split(":");
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration(items[0],
Integer.valueOf(items[1]));
configuration.setPassword(clusterConfig.getPassword());
return new JedisConnectionFactory(configuration);
}).forEach(connectionFactory -> {
connectionFactory.afterPropertiesSet();
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setTaskExecutor(executor);
listenerContainer.setSubscriptionExecutor(executor);
listeners.stream().forEach(listener -> listenerContainer.addMessageListener(listener, topic));
containers.add(listenerContainer);
});
}
}
更多推荐
已为社区贡献5条内容
所有评论(0)