Zookeeper实现配置中心

一、环境准备

1 . 准备集群

搭建Zookeeper集群,我准备了四服务,分别是192.168.91.129,192.168.91.130, 192.168.91.131, 192.168.91.132,我本地用虚拟机开的四台机器,家里有条件的可以直接搞几台阿里云服务器,其实一台机器也能搞,推荐用三台服务器

2 . 导入依赖(客户端版本和Server端版本保持一致)

  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
  </dependency>

二、实现思路

1 . 创建Zookeeper对象(利用门栓保证zk可用)
2 . 判断配置中心的节点是否存在(使用回调的方式,需要传递 watch和statWatch),然后阻塞线程,直到获取配置完成
3 . 几种情况

  1. 第一个分支【没有配置文件节点】:如果没有配置文件节点,那需要在exists的Watch中对event时间Type中NodeCreated进行处理(一旦配置文件创建了就会回调这个地方),处理无非就是去get配置节点
  2. 第二个分支【有配置文件】:这个是正常流程用getData把节点数据读取回来(需要传递Watch监控后续更新)
  3. 第三个分支【配置文件更新】:getData的时候传递Watch对后续一些操作进行了监听,如果配置文件更新需要重新getData
  4. 第四个分支【配置节点删除】:这个地方有处理的方式比较宽泛,如果要求配置文件节点被删除了,配置一定也要同步删除,强一致要求配置和节点同步,那就需要清空配置文件,并且重新阻塞,重新getData,如果没有这种需求那这里就看着处理就行了(毕竟谁没事会删除配置文件呢)

三、代码实现

1 . Test测试

package org.example.configdemo2;

import org.apache.zookeeper.ZooKeeper;
import org.example.congfig.MyConf;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author chao
 */
public class TestMain {

    /**
     * zk
     */
    private ZooKeeper zk;

    /**
     * 门栓
     */
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    @Before
    public void connection() {
        try {
            zk = new ZooKeeper("192.168.91.129,192.168.91.130,192.168.91.131,192.168.91.132/configTestdemo", 1000, event -> {
                switch (event.getState()) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        System.out.println("zk connection success !!");
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                    default:
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @After
    public void close() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void show() {
        WatchAndCallback watchAndCal = new WatchAndCallback(zk);
        MyConf conf = watchAndCal.getConf();
        while (true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            String config = conf.getConfig();
            if(config == null || "".equals(config)){
                conf = watchAndCal.getConf();
            }
            System.out.println(config);
        }
    }
}

2 . WatchAndCallback类(一个工具类)

package org.example.configdemo2;

import lombok.Data;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.example.congfig.MyConf;

import java.util.concurrent.CountDownLatch;

/**
 * @author chao
 */
@Data
public class WatchAndCallback implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {

    /**
     * zk
     */
    private ZooKeeper zk;

    /**
     * myConf
     */
    private MyConf myConf = new MyConf();

    /**
     * pathNode
     */
    public static final String PATH_NODE = "/cnf";

    /**
     * 门栓
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 构造
     *
     * @param zk zk
     */
    public WatchAndCallback(ZooKeeper zk) {
        this.zk = zk;
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeDeleted:
                myConf.setConfig("");
                countDownLatch = new CountDownLatch(1);
                System.out.println("配置删除,阻塞等待... ...");
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeDataChanged:
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
            default:
        }
    }

    public MyConf getConf() {
        zk.exists(PATH_NODE, this, this, "exists");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return myConf;
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // stat callback
        if (stat != null) {
            zk.getData(PATH_NODE, this, this, "get");
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if (stat != null) {
            myConf.setConfig(new String(data));
            countDownLatch.countDown();
        }
    }
}

四、测试结果

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐