CuratorCache

CuratorCache会试图将来自节点的数据保存在本地缓存中。 可以缓存指定的单个节点,也可以缓存以指定节点为根的整个子树(默认缓存方案)。可以给CuratorCache实例注册监听器,当相关节点发生更改时会接收到通知, 将响应更新、创建、删除等事件。

很多基于ZooKeeper的框架,就是使用CuratorCache来缓存相关节点的数据,比如ElasticJob,这样大部分节点的数据可以通过内存来获取,提高了性能,并且可以很方便地监听相关节点的事件。

CuratorCache接口源码:

public interface CuratorCache extends Closeable, CuratorCacheAccessor
{
    /**
     * 缓存构建选项
     */
    enum Options
    {
        /**
         * 通常,以指定节点为根的整个子树都会被缓存(默认缓存方案)
         * 这个选项只会缓存指定的节点(即单节点缓存)
         */
        SINGLE_NODE_CACHE,

        /**
         * 通过org.apache.curator.framework.api.GetDataBuilder.decompressed()解压数据
         */
        COMPRESSED_DATA,

        /**
         * 通常,当通过close()关闭缓存时,会通过CuratorCacheStorage.clear()清除storage
         * 此选项可防止清除storage
         */
        DO_NOT_CLEAR_ON_CLOSE
    }

    /**
     * 使用标准standard实例返回具有给定选项的给定路径的CuratorCache实例
     */
    static CuratorCache build(CuratorFramework client, String path, Options... options)
    {
        return builder(client, path).withOptions(options).build();
    }

    /**
     * 启动CuratorCache构建器
     */
    static CuratorCacheBuilder builder(CuratorFramework client, String path)
    {
        return new CuratorCacheBuilderImpl(client, path);
    }

    /**
     * 启动CuratorCacheBridge构建器,CuratorCacheBridge是一个facade
     * 如果持久Watcher可用,则使用CuratorCache,否则使用TreeCache(使用ZooKeeper3.5.x)
     */
    static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path)
    {
        return new CuratorCacheBridgeBuilderImpl(client, path);
    }

    /**
     * 启动缓存
     * 这将导致从缓存的根节点开始进行完全刷新
     * 并为找到的所有节点生成事件
     */
    void start();

    /**
     * 关闭缓存
     * 停止响应事件
     */
    @Override
    void close();

    /**
     * 返回监听器容器,以便可以注册监听器以收到缓存更改的通知
     */
    Listenable<CuratorCacheListener> listenable();

    /**
     * 从storage中返回指定路径的节点数据
     */
    @Override
    Optional<ChildData> get(String path);

    /**
     * 返回当前storage中的条目数
     */
    @Override
    int size();

    /**
     * 返回storage中条目的流
     */
    @Override
    Stream<ChildData> stream();
}

CuratorCache接口通过build静态方法返回的CuratorCache实例是其实现类的实例(最终会调用CuratorCacheBuilderImpl类的build方法):

    @Override
    public CuratorCache build()
    {
        return new CuratorCacheImpl(client, storage, path, options, exceptionHandler);
    }

而只需要使用CuratorCache接口即可(创建的实例就是CuratorCacheImpl类的实例),而CuratorCacheImpl类中可用的方法基本上是实现CuratorCache接口中的方法。
在这里插入图片描述

CuratorCacheListener

CuratorCacheListenerCuratorCache事件的监听器。

CuratorCacheListener接口源码(函数式接口):

@FunctionalInterface
public interface CuratorCacheListener
{
    /**
     * 描述事件的枚举类型
     */
    enum Type
    {
        /**
         * 一个新节点被添加到缓存中
         */
        NODE_CREATED,

        /**
         * 已在缓存中的节点被更改
         */
        NODE_CHANGED,

        /**
         * 已在缓存中的节点被删除
         */
        NODE_DELETED
    }

    /**
     * 在创建、更改或删除节点时被调用
     * 从而调用监听器
     */
    void event(Type type, ChildData oldData, ChildData data);

    /**
     * 当缓存启动时,会跟踪初始节点,当它们完成加载到缓存中时,将调用此方法
     */
    default void initialized()
    {
        // NOP
    }

    /**
     * 返回允许特定类型和特殊用途监听器的构建器
     */
    static CuratorCacheListenerBuilder builder()
    {
        return new CuratorCacheListenerBuilderImpl();
    }
}

通过CuratorCacheListenerBuilderImpl的实例可以给CuratorCache添加各种监听器。
在这里插入图片描述

演示

SINGLE_NODE_CACHE(单节点)

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.util.Optional;

public class Application {

    private static final String SERVER_PROXY = "192.168.31.175:9000";
    private static final int CONNECTION_TIMEOUT_MS = 40000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final String NAMESPACE = "MyNamespace";

    public static void main(String[] args) throws Exception {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(SERVER_PROXY)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(SESSION_TIMEOUT_MS)
                .namespace(NAMESPACE)
                .build();

        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        curator.blockUntilConnected();

        if(curator.checkExists().forPath("/father") != null) {
            curator.delete().deletingChildrenIfNeeded().forPath("/father");
        }
        
        // 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
        // 缓存构建选项是SINGLE_NODE_CACHE
        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        // 创建一系列CuratorCache监听器,都是通过lambda表达式指定
        CuratorCacheListener listener = CuratorCacheListener.builder()
                // 初始化完成时调用
                .forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
                // 添加或更改缓存中的数据时调用
                .forCreatesAndChanges(
                        (oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
                                oldNode, node)
                )
                // 添加缓存中的数据时调用
                .forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))
                // 更改缓存中的数据时调用
                .forChanges(
                        (oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",
                                oldNode, node)
                )
                // 删除缓存中的数据时调用
                .forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
                // 添加、更改或删除缓存中的数据时调用
                .forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data))
                .build();

        // 给CuratorCache实例添加监听器
        cache.listenable().addListener(listener);

        // 启动CuratorCache
        cache.start();

        // 创建节点/father/son/grandson1
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());
        
        // 创建节点/father/son/grandson1/test
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test", "test".getBytes());
        
        // 创建节点/father/son/grandson1/test/test2
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test/test2", "test2".getBytes());
       
        // 更改节点/father/son/grandson1的数据
        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());

        // 更改节点/father/son/grandson1/test的数据
        curator.setData()
                .forPath("/father/son/grandson1/test", "new test".getBytes());

        // 删除节点/father/son/grandson1
        curator.delete()
                .deletingChildrenIfNeeded()
                .forPath("/father/son/grandson1");

        Thread.sleep(10000000);
    }
}

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然/father/son/grandson1的子节点并没有被缓存(因为缓存构建选项是SINGLE_NODE_CACHE),因此子节点的相关操作并没有被监听。

默认(子树)

如果使用默认选项构建缓存,即:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]

// 创建节点/father/son/grandson1/test,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]

// 创建节点/father/son/grandson1/test/test2,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 更改节点/father/son/grandson1/test,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]

// 删除节点/father/son/grandson1/test/test2,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}] [null]

// 删除节点/father/son/grandson1/test,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}] [null]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然/father/son/grandson1的子节点也被缓存了,因此子节点的相关操作可以被监听。

NodeCacheListener

是一种桥接监听器,可以将旧式监听器NodeCacheListener(因为NodeCache类已经标记@Deprecated注解)与CuratorCache重用。

    @Override
    public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
    {
        listeners.add(new NodeCacheListenerWrapper(listener));
        return this;
    }

通过lambda表达式指定NodeCacheListener实例,该实例不接收任何参数。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forNodeCache(() -> System.out.println("forNodeCache"))
                .build();

输出:

// 创建节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");
// 创建节点/father/son/grandson1
forNodeCache
// 创建节点/father/son/grandson1/test
forNodeCache
// 创建节点/father/son/grandson1/test/test2
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1/test/test2
forNodeCache
// 删除节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

很显然该监听器监听缓存中的所有节点(不需要知道这些节点发生事件的数据)。

PathChildrenCacheListener

是一种桥接监听器,可以将旧式监听器PathChildrenCacheListener(因为PathChildrenCache类已经标记@Deprecated注解)与CuratorCache重用。该监听器需要指定根路径,以便仅为此路径的子级而不是路径本身桥接事件。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forPathChildrenCache("/father/son/grandson1", curator, (client, event) -> {
                    System.out.println(event);
                })
                .build();

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198507,1640856689919,1640856689919,0,0,0,0,4,0,198507
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

很显然该监听器监听缓存中指定根节点的所有子节点,并且不包括根节点。

TreeCacheListener

是一种桥接监听器,可以将旧式监听器TreeCacheListener(因为TreeCache类已经标记@Deprecated注解)与CuratorCache重用。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forTreeCache(curator, (client, event) -> {
                    System.out.println(event);
                })
                .build();

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198523,198523,1640857107063,1640857107063,0,1,0,0,4,1,198524
, data=[100, 97, 116, 97]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198540,198540,1640857267893,1640857267893,0,1,0,0,4,1,198541
, data=[100, 97, 116, 97]}}

// 创建节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198541,1640857267896,1640857267896,0,0,0,0,4,0,198541
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 更改节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

很显然该监听器监听缓存中的所有节点(需要知道这些节点发生事件的数据)。

获取缓存数据

        if(curator.checkExists().forPath("/") != null) {
            curator.delete().deletingChildrenIfNeeded().forPath("/");
        }

        CuratorCache cache = CuratorCache.build(curator, "/");

        cache.start();
    
        // 创建一系列节点
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test/test2", "test2".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/kaven", "kaven".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/jojo", "jojo".getBytes());

        // 获取指定路径的节点数据
        Optional<ChildData> childData = cache.get("/father/son/grandson1");
        if(childData.isPresent()) System.out.println(childData);

        // 使用stream将缓存的节点按数据值(字符串)从小到大,如果数据值相等按路径(字符串)从小到大排序
        // 收集为List类型
        List<ChildData> childDataList = cache.stream()
                .sorted((childData1, childData2) -> {
                    int dataCompare = new String(childData1.getData()).compareTo(new String(childData2.getData()));
                    if(dataCompare != 0) return dataCompare;
                    else return childData1.getPath().compareTo(childData2.getPath());
                })
                .collect(Collectors.toList());

        System.out.println(childDataList.size());
        // 遍历在缓存中获得的排序后的节点数据
        childDataList.forEach(child -> {
            System.out.print(child.getPath() + " : ");
            if(child.getData().length != 0) System.out.println(new String(child.getData()));
            // 如果节点没有存储数据,输出[blank]
            else System.out.println("[blank]");
        });

输出:

Optional[ChildData{path='/father/son/grandson1', stat=198753,198753,1640920752605,1640920752605,0,1,0,0,4,1,198754
, data=[100, 97, 116, 97]}]
8
/ : [blank]
/father : [blank]
/father/son : [blank]
/father/son/grandson1 : data
/jojo : jojo
/kaven : kaven
/father/son/grandson1/test : test
/father/son/grandson1/test/test2 : test2

输出符合预期。

Curator框架的数据缓存与监听CuratorCache就介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐