参考:Zookeeper框架Curator使用 - 扎心了,老铁 - 博客园 (cnblogs.com)

1,引入依赖

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>
        <!--CuratorFramework是一款连接zookeeper服务的框架-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

2,测试

@Test
    public void method() throws Exception {
        //重试策略
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, exponentialBackoffRetry);
        //启动客户端
        client.start();

        client.create().forPath("/testapi");
        //创建一个有内容的节点数据
        client.create().forPath("/a","hello world".getBytes());
        //创建一个多级目录的节点
        client.create().creatingParentsIfNeeded().forPath("/b/b1/b2","good".getBytes());
        //创建一个带序号的持久节点 PERSISTENT_SEQUENTIAL:带序号的 相当于命令行的 -s
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/c/c1","niubi".getBytes());
        //创建临时节点 设置时延5秒关闭
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/d","nb".getBytes());
        //创建临时有序节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/e","6666".getBytes());
        //设置时延5秒关闭
        Thread.sleep(5000);
        client.close();
    }
    @Test
    public void method2() throws Exception {
        //重试策略
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, exponentialBackoffRetry);
        //启动客户端
        client.start();

        //修改节点数据
        client.setData().forPath("/testapi","setData".getBytes());

        //关闭客户端
        client.close();
    }
    @Test
    public void method3() throws Exception {
        //重试策略
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, exponentialBackoffRetry);
        //启动客户端
        client.start();

        //删除单节点
        client.delete().forPath("/testapi");
        //删除多层级结构的节点
        client.delete().deletingChildrenIfNeeded().forPath("/b");
        //强制删除节点,只要客户端会话有效,那么会持续删除,直到节点删除成功
        //例如遇到一些网络异常的情况的时候
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/c");
        //
        client.close();
    }
    @Test
    public void method4() throws Exception {
        //重试策略
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, exponentialBackoffRetry);
        //启动客户端
        client.start();

        //创建节点
       // client.create().creatingParentsIfNeeded().forPath("/a","good".getBytes());
        //查询节点数据
        byte[] bytes = client.getData().forPath("/a");
        System.out.println(new String(bytes));

        client.close();
    }
Watch监听机制(NodeCache,PathChildrenCache,TreeCache)
@Test
    public void method5() throws Exception {
        //重试策略
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3000);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, exponentialBackoffRetry);
        //启动客户端
        client.start();

        //Watch监听机制(NodeCache,PathChildrenCache,TreeCache)
        //NodeCache监听本节点
        //PathChildrenCache监听子节点
        //TreeCache监听本节点加子节点
        NodeCache nodeCache = new NodeCache(client, "/a");
        //true代表初始化时就获取节点的数据并且缓存到本地
        nodeCache.start(true);

        //获取节点数据,初始化
        System.out.println(new String(nodeCache.getCurrentData().getData()));

        //添加监听回调事件
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                //值修改了就打印一下
                System.out.println(new String(nodeCache.getCurrentData().getData()));
            }
        });

        //此处不能关闭客户端
        //阻塞进程
        System.in.read();
    }
@Test
    public void method6() throws Exception {
        //重试策略
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3,3000);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, exponentialBackoffRetry);
        //启动客户端
        client.start();
        client.create().creatingParentsIfNeeded().forPath("/q/q1/q2", "test".getBytes());
        //true代表把节点内容缓存起来
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/q/q1/q2",true);
        /*
        * POST_INITIALIZED_EVENT表示在启动时缓存子节点数据,并且提示初始化
        * NORMAL普通启动方式,在启动时缓存子节点数据
        * BUILD_INITIAL_CACHE:在启动时什么都不会输出,这种模式在执行start之前会先执行rebuild方法,而该方法不会发出任何通知
        * */
        /*
         * StartMode:初始化方式
         * POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         * */
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        List<ChildData> childDataList = pathChildrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点数据列表:");
        for(ChildData cd : childDataList){
            String childData = new String(cd.getData());
            System.out.println(childData);
        }
        //添加监听
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework ient, PathChildrenCacheEvent event) throws Exception {
                if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
                    System.out.println("子节点初始化成功");
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
                    System.out.println("添加子节点路径:"+event.getData().getPath());
                    System.out.println("子节点数据:"+new String(event.getData().getData()));
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                    System.out.println("删除子节点:"+event.getData().getPath());
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("修改子节点路径:"+event.getData().getPath());
                    System.out.println("修改子节点数据:"+new String(event.getData().getData()));
                }
            }
        });
        //此处不能关闭客户端
        //阻塞进程
        System.in.read();
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐