背景

项目组准备将自建的redis切到公司云平台的redis服务;自建的redis用的是哨兵模式,而云平台的提供的redis服务用的是集群模式。
切换前先分析redis用到了那些功能,redis集群模式下是否兼容。分析代码时发现,用到了RedisMessageListenerContainer,该类用于监听redis发出的消息(redis的发布订阅功能)。我们使用到了redis键过期通知的特性,来实现超时处理异步任务的。
观察RedisMessageListenerContainer,发现其入参为RedisConnectionFactory,并不是集群信息;怀疑RedisMessageListenerContainer会不会只会随机连一个节点来监听消息,另外redis键过期事件不会广播,最终导致该功能失效。
经多方查证,结论如下:

  1. redis集群模式下,消息发布(Pub/Sub)会进行广播
  2. redis键过期事件也是走的消息通道,不过为了防止消息量过大,不会进行广播。详细见【redis官方文档翻译系列】-Redis keyspace notifications
  3. jedis确实只会随机连一个节点,来监听消息。不过这也符合(Pub/Sub)功能的语义。
  4. redis的从节点的键过期,靠主节点触发键过期后发送delete指令,从节点本身不会产生键过期事件。

解决方案

基于RedisMessageListenerContainer实现对所有节点的监听。

@Slf4j
public class RedisClusterMessageListenerContainer
        implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
    private String beanName;

    private RedisClusterConfig clusterConfig;

    private List<MessageListener> listeners;

    private ThreadPoolTaskExecutor executor;

    private Topic topic;

    private volatile boolean running = false;

    private List<RedisMessageListenerContainer> containers = new ArrayList<>(6);

    public RedisClusterMessageListenerContainer(RedisClusterConfig clusterConfig, List<MessageListener> listeners,
            ThreadPoolTaskExecutor executor, Topic topic) {
        this.clusterConfig = clusterConfig;
        this.listeners = listeners;
        this.executor = executor;
        this.topic = topic;
        initContainers();
    }

    @Override
    public void setBeanName(String name) {
        this.beanName = name;
    }

    @Override
    public void destroy() throws Exception {
        containers.forEach(item -> {
            try {
                item.destroy();
            } catch (Throwable e) {
                log.warn("exception happen when destroy RedisMessageListenerContainer", e);
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        containers.forEach(item -> item.afterPropertiesSet());
    }

    @Override
    public void start() {
        containers.forEach(item -> item.start());
        running = true;
    }

    @Override
    public void stop() {
        containers.forEach(item -> item.stop());
        running = false;
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    private void initContainers() {
        Stream.of(clusterConfig.getNodes().split(",")).map(node -> {
            String[] items = node.split(":");
            RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration(items[0],
                    Integer.valueOf(items[1]));
            configuration.setPassword(clusterConfig.getPassword());

            return new JedisConnectionFactory(configuration);
        }).forEach(connectionFactory -> {
            connectionFactory.afterPropertiesSet();
            RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
            listenerContainer.setConnectionFactory(connectionFactory);
            listenerContainer.setTaskExecutor(executor);
            listenerContainer.setSubscriptionExecutor(executor);
            listeners.stream().forEach(listener -> listenerContainer.addMessageListener(listener, topic));
            containers.add(listenerContainer);
        });
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐