一、Curator 客户端使用

Curator是 Netflix公司开源的一套ZooKeeper客户端框架,和 ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及 NodeExistsException异常等。

Curator还为 ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

Curator 客户端使用更加方便,功能更加强大,目前应用更加广泛。

官方网站:https://curator.apache.org/

添加依赖:

  • curator-framework:是对ZooKeeper的底层API的一些封装。
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
    <!-- Zookeeper 原生API客户端 -->
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.6.3</version>
    </dependency>
    
    <!-- Curator 客户端   -->
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>5.1.0</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>5.1.0</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

1、创建一个客户端实例

在使用 curator-framework包操作 ZooKeeper前,首先要创建一个客户端实例(CuratorFramework类型的对象)。

有两种方法:

  • 使用工厂类CuratorFrameworkFactory的静态 newClient()方法。
  • 使用工厂类CuratorFrameworkFactory的静态 builder构造者方法。
public class CuratorClientConnectUtils {

    private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开

    private static class InnerClass{
        private static CuratorFramework client = clientConnect2();
    }

    public static CuratorFramework getInstance(){
        return InnerClass.client;
    }


    public static void main(String[] args){
        //CuratorFramework client = clientConnect1();
        CuratorFramework client = clientConnect2();
        //启动客户端,必须要有
        client.start();
        System.out.println("==CuratorFramework==" + client);
    }

    /**
     * 使用工厂类CuratorFrameworkFactory的静态newClient()方法。
     * @throws Exception
     */
    public static CuratorFramework clientConnect1() {
        // 重试策略, 失败重连3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);
        //创建客户端实例
        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);
        return client;

    }

    /**
     * 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。
     * @throws Exception
     */
    public static CuratorFramework clientConnect2() {
        // 重试策略, 失败重连3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);
        //创建客户端实例
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString(CONNECT_STR)
            .sessionTimeoutMs(3000) // 会话超时时间
            .connectionTimeoutMs(50000) // 连接超时时间
            .retryPolicy(retryPolicy)
            .namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path
            .build();
       return client;
    }
}

部分参数说明:

  • connectionString: 服务器地址列表,集群用逗号分隔, 如 host1:port1,host2:port2,host3:port3 。
  • retryPolicy: 重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。
    Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
  • 超时时间: Curator 客户端创建过程中,有两个超时时间的设置。
    • 一个是sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。
    • 一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。
      sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

2、CRUD实例

1)创建节点

在 Curator 中,客户端可以使用 create 方法 创建数据节点,并通过 withMode 方法 指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 方法 来指定节点的路径和数据信息。 也支持级联创建节点需要指定 creatingParentsIfNeeded方法

2)获取数据

客户端使用 getData()方法 获取节点数据。

3)更新节点
客户端使用 setData() 方法更新 ZooKeeper 服务上的数据节点,在 setData 方法 的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

4)删除节点

客户端使用 delete方法 删除节点。

  • guaranteed方法:主要作用是保障删除成功。
    其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
  • deletingChildrenIfNeeded方法:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子孙节点。

5)异步接口

Curator 引入了 BackgroundCallback 接口 ,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

客户端可以通过 inBackground方法添加 BackgroundCallback 接口来异步处理操作。异步处理回调我们可以在 processResult方法操作。实例中异步创建节点使用了默认的线程。

CRUD实例如下:

public class CRUDService {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorClientConnectUtils.getInstance();
        // 启动客户端
        client.start();

        String path1 = "/curator_znode/syncCreateNode.node1/node1"; //允许级联创建
        String path2 = "/curator_znode/asyncCreateNode.node1";
        String path3 = "/curator_znode2";
        //createOneNode(client, path3, "value path");
        syncCreateNode(client, path1, "VALUE11");
        //asyncCreateNode(client, path2, "VALUE22");
        //asyncCreateNode2(client, path2, "VALUE22");

        //String nodeData = getNodeData(client, path1);
        //System.out.println("getNodeData, path1=:" + nodeData);
        updateNodeData(client, path1, "value33333");

        //deleteNode(client, path3);
        String path = "/curator_znode";
        //deleteChildrenIfNeeded(client, path);

    }

    /**
     * 创建单节点
     *
     * @param client
     * @param path - 如果是级联节点,父节点不存在时,会报错:KeeperErrorCode = NoNode for /curator_znode/asyncCreateNode.node1
     * @param data
     * @throws Exception
     */
    public static void createOneNode(CuratorFramework client, String path, String data) throws Exception {
        String path1 = client.create()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path, data.getBytes(StandardCharsets.UTF_8));
        System.out.println("createOneNode:path1=" + path1);
    }

    /**
     * 同步级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)
     *  父节点不存在,则会创建
     *  最后的子节点如果存在,则会报错:KeeperErrorCode = NodeExists for /curator_znode/syncCreateNode.node1/node1
     * @param client
     * @param path
     * @param data
     */
    public static void syncCreateNode(CuratorFramework client, String path, String data) {
        try {
            String path1 = client.create()
                    .creatingParentsIfNeeded() //级联创建,父节点不存在,则会创建
                    .withMode(CreateMode.PERSISTENT) //持久节点
                    .forPath(path, data.getBytes(StandardCharsets.UTF_8));
            System.out.println("syncCreateNode:path1=" + path1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 异步级联创建节点
     * @param client
     * @param path
     * @param data
     */
    public static void asyncCreateNode(CuratorFramework client, String path, String data) throws Exception {
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .inBackground(new BackgroundCallback() {
                    // 创建成功的回调
                    @Override
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                        System.out.println("asyncCreateNode=========" + event.getName() + ":" + event.getPath());
                    }
                })
                .forPath(path, data.getBytes(StandardCharsets.UTF_8));
        TimeUnit.MILLISECONDS.sleep(100);
    }
    /**
     * 异步级联创建节点
     * @param client
     * @param path
     * @param data
     */
	public static void asyncCreateNode2(CuratorFramework client, String path, String data) throws Exception {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .inBackground(new BackgroundCallback() {
                        // 创建成功的回调
                        @Override
                        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                            System.out.println("asyncCreateNode====使用自定义线程池=====" + event.getName() + ":" + event.getPath());
                        }
                    }, executorService)
                .forPath(path, data.getBytes(StandardCharsets.UTF_8));
		TimeUnit.MILLISECONDS.sleep(100);
	}

    /**
     * 获取节点数据
     * @param client
     * @param path
     */
    public static String getNodeData(CuratorFramework client, String path) throws Exception {
        byte[] data = client.getData().storingStatIn(new Stat()).forPath(path);
        return new String(data, StandardCharsets.UTF_8);
    }

    /**
     * 修改节点数据
     * @param client
     * @param path
     * @param data
     */
    public static void updateNodeData(CuratorFramework client, String path, String data) throws Exception {
        Stat stat = client.setData()
                //.withVersion(-1) //可以根据需要使用
                .forPath(path, data.getBytes(StandardCharsets.UTF_8));
        System.out.println("updateNodeData path, stat:" + stat);
    }

    /**
     * 删除该节点,该节点必须为空节点
     * 如果该节点有子节点会报错:KeeperErrorCode = Directory not empty for /curator_znode
     * @param client
     * @param path
     */
    public static void deleteNode(CuratorFramework client, String path) throws Exception {
        client.delete()
                //.withVersion(1) // 版本号不匹配时,无法删除,会报错:KeeperErrorCode = BadVersion for /curator_znode
                .forPath(path); //删除该节点
    }

    /**
     * 级联删除该节点以及子孙节点
     * @param client
     * @param path
     */
    public static void deleteChildrenIfNeeded(CuratorFramework client, String path) throws Exception {
        client.delete()
                .guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功
                .deletingChildrenIfNeeded()
                //.withVersion(1)
                .forPath(path); //级联删除
    }

    /**
     * 判断node节点是否存在
     * @param client
     * @param nodePath
     * @return true - 表示节点存在
     * @throws Exception
     */
    public static boolean checkNodeExists(CuratorFramework client,String nodePath) throws Exception {
        Stat stat = client.checkExists().forPath(nodePath);
        System.out.println(null==stat ? "节点不存在" : "节点存在");
        return null != stat;
    }

}

2.1 报错:KeeperErrorCode = ConnectionLoss

14:49:35.317 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background operation retry gave up
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
...
14:49:35.319 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background retry gave up
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
	at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:981)
...
14:49:36.313 [Curator-ConnectionStateManager-0] WARN org.apache.curator.framework.state.ConnectionStateManager - Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: 1002. Adjusted session timeout ms: 1000

报错信息如上,查看了一下:
在这里插入图片描述
具体是什么时间超时判断,暂时不清楚。我尝试把会话时间调到3000之后。问题就解决了。

3、Cache缓存监控节点变化

cache是一种缓存机制,可以借助 cache实现监听。

简单来说,cache在客户端缓存了 znode的各种状态,当感知到 zk集群的 znode状态变化,会触发 event事件,注册的监听器会处理这些事件。

curator支持的 cache种类有4种:

  • Path Cache
    Path Cache用来观察ZNode的子节点并缓存状态,但是不会对二级子节点进行监听。如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
  • Node Cache
    Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
  • Tree Cache
    Tree Cache是上面两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
    TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点 进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
  • Curator Cache
    Curator Cache,是在 zk3.6新版本添加的特性,之前版本上面3种监听可以单独使用,在 zk3.6+版本之后,上面3种监听被标记为过时,并使用 CuratorCache取代了上面3种监听

Curator Cache类型是通过 CuratorCache类来实现的,监听器对应的接口为 CuratorCacheListener。

Curator Cache使用实例如下:

public class CuratorCacheTest {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorClientConnectUtils.getInstance();
        // 启动客户端
        client.start();

        String path1 = "/curatorCache1";
        String path2 = "/curatorCache2";
        curatorCache1(client, path1);
        curatorCache2(client, path2);

        if(CRUDService.checkNodeExists(client, path1)){
            CRUDService.deleteChildrenIfNeeded(client, path1);
            //CRUDService.deleteNode(client, path1);
        }
        if(CRUDService.checkNodeExists(client, path2)){
            CRUDService.deleteChildrenIfNeeded(client, path2);
            //CRUDService.deleteNode(client, path2);
        }

        CRUDService.syncCreateNode(client, path1, "VALUE22");
        CRUDService.asyncCreateNode(client, path2, "VALUE22");
        CRUDService.updateNodeData(client, path2, "value444");
        String path22 = path2 + "/node1";
        CRUDService.syncCreateNode(client, path22, "VALUE22");

        //让线程休眠(为了方便测试)
        TimeUnit.MINUTES.sleep(3);
    }

    public static void curatorCache1(CuratorFramework zkClient, String path) {
        CuratorCache curatorCache = CuratorCache.build(zkClient, path);
        curatorCache.listenable().addListener(new CuratorCacheListener() {
            @Override
            public void event(Type type, ChildData oldData, ChildData newdata) {
                switch (type) {
                    //各种判断
                    case NODE_CREATED:
                        System.out.println("==该节点被创建,type=" + type);
                        break;
                    default:
                        System.out.println("==该节点操作,type=" + type);
                        break;
                }
            }
        });
        //开启缓存。 这将导致缓存根节点的完整刷新,并为所有找到的节点生成事件,等等。
        curatorCache.start();
    }


    public static void curatorCache2(CuratorFramework zkClient, String path) throws InterruptedException {
        CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();

        //构建监听器:
        //1.node cache:CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );
        //2.path cache:CuratorCacheListener.builder().forPathChildrenCache();
        //3.tree cache:CuratorCacheListener.builder().forTreeCache.forTreeCache();
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forNodeCache(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        System.out.println("节点改变了...");
                    }
                })
                .build();

        //添加监听
        curatorCache.listenable().addListener(listener);
        //开启缓存
        curatorCache.start();
    }
}

在zk服务端,使用命令操作了节点,监听被触发。
在这里插入图片描述

– 求知若饥,虚心若愚。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐