springboot整合zookeeper

zookeeper基础

本文只做springboot整合和api使用

curator简介

curatorNetflix公司开源的一个 zookeeper客户端,后捐献给 apachecurator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的抽象封装,实现了Fluent风格的APl接口,是最好用,最流行的zookeeper的客户端

原生zookeeperAPI的不足

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动重连超时机制
  • watcher一次注册生效一次
  • 不支持递归创建树形节点

curator特点

  • 解决session会话超时重连
  • watcher反复注册
  • 简化开发api
  • 遵循Fluent风格API

curator整合

依赖

<!-- zookeeper -->
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>4.0.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>4.0.1</version>
		</dependency>

配置类

@Configuration
@ConfigurationProperties(prefix = "zookeeper.curator")
@Data
public class ZookeeperConfig {

	/**
	 * 集群地址
	 */
	private String ip;

	/**
	 * 连接超时时间
	 */
	private Integer connectionTimeoutMs;
	/**
	 * 会话超时时间
	 */
	private Integer sessionTimeOut;

	/**
	 * 重试机制时间参数
	 */
	private Integer sleepMsBetweenRetry;

	/**
	 * 重试机制重试次数
	 */
	private Integer maxRetries;

	/**
	 * 命名空间(父节点名称)
	 */
	private String namespace;

	/**
	 * - `session`重连策略
  - `RetryPolicy retry Policy = new RetryOneTime(3000);`
    - 说明:三秒后重连一次,只重连一次
  - `RetryPolicy retryPolicy = new RetryNTimes(3,3000);`
    - 说明:每三秒重连一次,重连三次
  - `RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);`
    - 说明:每三秒重连一次,总等待时间超过个`10`秒后停止重连
  - `RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)`
    - 说明:这个策略的重试间隔会越来越长
      - 公式:`baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))`
        - `baseSleepTimeMs` = `1000` 例子中的值
        - `maxRetries` = `3` 例子中的值
	 * @return
	 * @throws Exception 
	 */
	@Bean("curatorClient")
	public CuratorFramework curatorClient() throws Exception {
		CuratorFramework client = CuratorFrameworkFactory.builder()
				//连接地址  集群用,隔开
				.connectString(ip)
				.connectionTimeoutMs(connectionTimeoutMs)
				//会话超时时间
				.sessionTimeoutMs(sessionTimeOut)
				//设置重试机制
				.retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries))
				//设置命名空间 在操作节点的时候,会以这个为父节点
				.namespace(namespace)
				.build();
		client.start();
		
		//注册监听器
		ZookeeperWatches watches = new ZookeeperWatches(client);
		watches.znodeWatcher();
		watches.znodeChildrenWatcher();
		return client;
	}

yml配置

zookeeper:
  curator:
    ip: 192.168.3.102:2181,192.168.3.103:2181,192.168.3.104:2181
    #ip: 192.168.3.103:2181
    sessionTimeOut: 50000
    sleepMsBetweenRetry: 1000
    maxRetries: 3
    namespace: demo
    connectionTimeoutMs: 50000

注册监听机制watches

public class ZookeeperWatches {
	
	private CuratorFramework client;
	
	public ZookeeperWatches(CuratorFramework client) {
		this.client = client;
	}
	
	public void znodeWatcher() throws Exception {
		NodeCache nodeCache = new NodeCache(client, "/node");
		nodeCache.start();
		nodeCache.getListenable().addListener(new NodeCacheListener() {
			
			@Override
			public void nodeChanged() throws Exception {
				System.out.println("=======节点改变===========");
				String path = nodeCache.getPath();
				String currentDataPath = nodeCache.getCurrentData().getPath();
				String currentData = new String(nodeCache.getCurrentData().getData());
				Stat stat = nodeCache.getCurrentData().getStat();
				System.out.println("path:"+path);
				System.out.println("currentDataPath:"+currentDataPath);
				System.out.println("currentData:"+currentData);
			}
		});
		
		System.out.println("节点监听注册完成");
	}
	
	public void znodeChildrenWatcher() throws Exception {
		PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node",true);
		pathChildrenCache.start();
		pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				System.out.println("=======节点子节点改变===========");
				Type type = event.getType();
				String childrenData = new String(event.getData().getData());
				String childrenPath = event.getData().getPath();
				Stat childrenStat = event.getData().getStat();
				
				System.out.println("子节点监听类型:"+type);
				System.out.println("子节点路径:"+childrenPath);
				System.out.println("子节点数据:"+childrenData);
				System.out.println("子节点元数据:"+childrenStat);
				
			}
		});
		
		System.out.println("子节点监听注册完成");
	}
}

所有测试Controller

@RestController
@RequestMapping(value = "/zookeeper",method = RequestMethod.POST)
@Api(tags = {"zookeeper的测试类"})
public class ZookeeperController {

	@Resource(name = "curatorClient")
	private CuratorFramework client;
	@Value("${zookeeper.curator.namespace}")
	String namespace;

	@RequestMapping("/createZnode")
	@ApiOperation("zookeeper测试---递归创建节点")
	@ApiOperationSupport(author = "lsx",order = 1)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "data", value = "值", required = false, paramType = "query"),           
	})
	public String createZnode(String path,@RequestParam(defaultValue = "")String data){
		path = "/"+path;
		List<ACL> aclList = new ArrayList<>();
		Id id = new Id("world", "anyone");
		aclList.add(new ACL(ZooDefs.Perms.ALL, id));
		try {
			client.create()
			.creatingParentsIfNeeded()  //没有父节点时 创建父节点
			.withMode(CreateMode.PERSISTENT)  //节点类型
			.withACL(aclList)   //配置权限
			.forPath(path, data.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
			return "节点创建失败"+e.getMessage();
		}
		return "节点创建成功";
	}

	@RequestMapping("/createAsyncZnode")
	@ApiOperation("zookeeper测试---异步递归创建节点")
	@ApiOperationSupport(author = "lsx",order = 2)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "data", value = "值", required = false, paramType = "query"),           
	})
	public String createAsyncZnode(String path,@RequestParam(defaultValue = "")String data){
		String paths = "/"+path;
		try {
			client.create()
			.creatingParentsIfNeeded()
			.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
			//异步回调   增删改都有异步方法
			.inBackground(new BackgroundCallback() {

				@Override
				public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
					System.out.println("异步回调--获取权限:"+client.getACL().forPath(paths));
					System.out.println("异步回调--获取数据:"+new String(client.getData().forPath(paths)));
					System.out.println("异步回调--获取事件名称:"+event.getName());
					System.out.println("异步回调--获取事件类型:"+event.getType());
				}
			})
			.forPath(paths, data.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
			return "节点创建失败"+e.getMessage();
		}
		return "节点创建成功";
	}

	@RequestMapping("/selectZnode")
	@ApiOperation("zookeeper测试---查看节点和元数据")
	@ApiOperationSupport(author = "lsx",order = 3)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
	})
	public JSONObject selectZnode(String path){
		JSONObject jsonObject = new JSONObject();
		String namespace = "/"+this.namespace;
		Stat stat;
		try {
			stat = client.checkExists().forPath(path);
			if (stat == null) {
				jsonObject.put("error", "不存在该节点");
			}
			String dataString = new String(client.getData().forPath(path));
			jsonObject.put(namespace+path, dataString);
			jsonObject.put("stat", stat);
		} catch (Exception e) {
			e.printStackTrace();
		}


		return jsonObject;
	}
	@RequestMapping("/selectChildrenZnode")
	@ApiOperation("zookeeper测试---查看子节点和数据")
	@ApiOperationSupport(author = "lsx",order = 4)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
	})
	public Map<String,String> selectChildrenZnode(String path){
		Map<String, String> map = new HashMap<>();
		String namespace = "/"+this.namespace;
		try {
			List<String> list = client.getChildren().forPath(path);
			for (String s : list) {
				String dataString = new String(client.getData().forPath(path+"/"+s));
				map.put(namespace+path+"/"+s, dataString);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

		return map;
	}

	@RequestMapping("/setData")
	@ApiOperation("zookeeper测试---设置数据")
	@ApiOperationSupport(author = "lsx",order = 5)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "data", value = "数据", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "version", value = "版本号(-1时  版本号不起作用)", required = true, paramType = "query"),   
	})
	public JSONObject setData(String path,String data,Integer version) {
		JSONObject jsonObject = new JSONObject();
		try {
			Stat stat = client.setData().withVersion(version).forPath(path, data.getBytes());
			jsonObject.put("success", "修改成功");
			jsonObject.put("version", stat.getVersion());
		} catch (Exception e) {
			e.printStackTrace();
			jsonObject.put("error", "修改失败:"+e.getMessage());
			return jsonObject;
		}
		return jsonObject;
	}

	@RequestMapping("/delete")
	@ApiOperation("zookeeper测试---删除节点")
	@ApiOperationSupport(author = "lsx",order = 6)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"),           
		@ApiImplicitParam(name = "version", value = "版本号(-1时  版本号不起作用)", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "isRecursive", value = "是否递归删除 1是 0否  默认为0", required = false, paramType = "query"),   
	})
	public JSONObject delete(String path,Integer version,@RequestParam(defaultValue = "0")Integer isRecursive) {
		JSONObject jsonObject = new JSONObject();
		try {
			if (isRecursive == 1) {
				client.delete().deletingChildrenIfNeeded().withVersion(version).forPath(path);
			}else {
				client.delete().withVersion(version).forPath(path);
			}
			jsonObject.put("success", "删除成功");
		} catch (Exception e) {
			e.printStackTrace();
			jsonObject.put("error", "删除失败:"+e.getMessage());
			return jsonObject;
		}
		return jsonObject;
	}

	@SuppressWarnings("finally")
	@RequestMapping("/transactionDisabled")
	@ApiOperation("zookeeper测试---测试事务(不开启事务)")
	@ApiOperationSupport(author = "lsx",order = 7)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "createPath", value = "创建的路径", required = true, paramType = "query"),
		@ApiImplicitParam(name = "createData", value = "创建的数据", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "setPath", value = "修改数据的路径", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "setData", value = "修改的数据", required = true, paramType = "query"),   
	})
	public String transactionDisabled(String createPath,String createData,String setPath,String setData) {

		try {
			//创建一个新的路径
			client.create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes());
			//修改一个没有的数据  让其报错   
			client.setData().forPath(setPath, setData.getBytes());

		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			return "执行完成";
		}

	}

	@SuppressWarnings({ "deprecation", "finally" })
	@RequestMapping("/transactionEnabled")
	@ApiOperation("zookeeper测试---测试事务(开启事务)")
	@ApiOperationSupport(author = "lsx",order = 8)
	@ApiImplicitParams({   
		@ApiImplicitParam(name = "createPath", value = "创建的路径", required = true, paramType = "query"),
		@ApiImplicitParam(name = "createData", value = "创建的数据", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "setPath", value = "修改数据的路径", required = true, paramType = "query"),   
		@ApiImplicitParam(name = "setData", value = "修改的数据", required = true, paramType = "query"),   
	})
	public String transactionEnabled(String createPath,String createData,String setPath,String setData) {
		try {
		/**
			 * 这里有个坑点 使用 CuratorFramework 进行事务处理时,如果使用org.apache.zookeeper 的依赖版本是 3.6.x时  				
			 * 会出现找不到 MultiTransactionRecord 类的异常
			 * 在 3.6.x 版本 没有 MultiTransactionRecord 但是在3.4.10版本有这个类  不知道什么删除了
			 * 而 curator-framework 的 事务处理用到 CuratorMultiTransactionRecord 这个  类 
			 * 但是 CuratorMultiTransactionRecord 继承了 MultiTransactionRecord 这个类 就出现了类找不到的异常
			 * 
			 * 解决办法 :要么降低zookeeper 的版本  为3.4.10  要么使用zookeeper原生事务代码 
			 *我这里降低了zookeeper的版本
			 */
					//该方法后续版本建议删除
//			client.inTransaction()
//			.create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes())
//			.and()
//			.setData().forPath(setPath, setData.getBytes())
//			.and().commit();
			//上述代码 替换成 以下代码
			CuratorOp create = client.transactionOp().create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes());
			CuratorOp setOp = client.transactionOp().setData().forPath(setPath, setData.getBytes());
			//该方法有返回值 可以打印结果查看  一般不需要
			client.transaction().forOperations(Arrays.asList(create,setOp));
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			return "执行完成";
		}

	}

	@RequestMapping("/InterProcessMutexUse")
	@ApiOperation("zookeeper测试---测试可重入排它锁")
	@ApiOperationSupport(author = "lsx",order = 8)
	public String InterProcessMutexUse() throws Exception{
		System.out.println("排它锁测试");
		InterProcessMutex lock = new InterProcessMutex(client, "/lock");
		System.out.println("占有锁中");
		lock.acquire(20L,TimeUnit.SECONDS);
		System.out.println("执行操作中");
		for (int i = 0; i < 20; i++) {
			TimeUnit.SECONDS.sleep(1);
			System.out.println(i);
		}
		lock.release();
		return "锁已释放";
	}

	@RequestMapping("/interProcessReadWriteLockUseWrite")
	@ApiOperation("zookeeper测试---测试读写锁--写锁")
	@ApiOperationSupport(author = "lsx",order = 9)
	public String interProcessReadWriteLockUseWrite() throws Exception {
		System.out.println("写锁");
		// 分布式读写锁
		InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
		// 开启两个进程测试,观察到写写互斥,特性同排它锁
		System.out.println("获取锁中");
		lock.writeLock().acquire();
		System.out.println("操作中");
		for (int i = 0; i < 10; i++) {
			TimeUnit.SECONDS.sleep(1);
			System.out.println(i);
		}
		lock.writeLock().release();
		return "释放写锁";
	}

	@RequestMapping("/interProcessReadWriteLockUseRead")
	@ApiOperation("zookeeper测试---测试读写锁--读锁")
	@ApiOperationSupport(author = "lsx",order = 10)
	public String interProcessReadWriteLockUseRead() throws Exception {
		System.out.println("读锁");
		// 分布式读写锁
		InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
		// 开启两个进程测试,观察得到读读共享,两个进程并发进行,注意并发和并行是两个概念,(并发是线程启动时间段不一定一致,并行是时间轴一致的)
		// 再测试两个进程,一个读,一个写,也会出现互斥现象
		System.out.println("获取锁中");
		lock.readLock().acquire();
		System.out.println("操作中");
		for (int i = 0; i < 10; i++) {
			TimeUnit.SECONDS.sleep(1);
			System.out.println(i);
		}
		lock.readLock().release();
		return "释放读锁";
	}
}

结果展示

controller对应swagger2页面
在这里插入图片描述

新增/查看(具体代码在controller层)

zookeeper初始节点
在这里插入图片描述
新增:
在这里插入图片描述
查看:在这里插入图片描述
服务器zookeeper节点:
在这里插入图片描述
ps:为什么会有demo节点
yml配置文件中配置了namespace,对应配置类中的.namespace(namespace)

@Bean("curatorClient")
	public CuratorFramework curatorClient() throws Exception {
		CuratorFramework client = CuratorFrameworkFactory.builder()
				//连接地址  集群用,隔开
				.connectString(ip)
				.connectionTimeoutMs(connectionTimeoutMs)
				//会话超时时间
				.sessionTimeoutMs(sessionTimeOut)
				//设置重试机制
				.retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries))
				//设置命名空间 在操作节点的时候,会以这个为父节点
				.namespace(namespace)
				.build();
		client.start();

修改/删除(具体代码在controller层)

在这里插入图片描述
查看zookeeper上的数据 原来是data 现在变成了test
在这里插入图片描述
在这里插入图片描述
查看zookeeper上的节点 没有了demo和其子节点
在这里插入图片描述

事务(具体代码在controller层)

zookeeper初始状态 /demo 下 只有一个node节点
在这里插入图片描述
执行不开启事务的方法:
在这里插入图片描述
系统报错:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /demo/dddddddd
再次查看zookeeper: 有transaction节点
在这里插入图片描述
将transaction节点删掉 测试开启事务
在这里插入图片描述
开启事务方法:
在这里插入图片描述
系统报错:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /demo/dddddddd
再次查看zookeeper: 没有transaction节点
在这里插入图片描述

监听使用

监听代码在 ZookeeperWatches
在配置类开启监听具体代码查看ZookeeperConfig 配置类
项目启动中会出现如下信息 如果zookeeper没有node节点 会自动生成node节点并触发监听回调代码
在这里插入图片描述
对node节点创建子节点、修改子节点、删除子节点 会触发子节点的监听回调
create /demo/node/test aa
在这里插入图片描述

分布式锁(具体代码在controller层)

不展示结果简单介绍下原理:
zookeeper中如何实现排他锁

  1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001
  2. 客户端取得/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
  3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock_000000002,那么则监听Lock_000000001
  4. 当前一位锁节点(Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)的逻辑
  5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

配置中心案例(没有代码)

工作中有这样的一个场景:数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存

若数据库的用户名和密码改变时候,还需要重新加载媛存,比较麻烦,通过 Zookeeper可以轻松完成,当数据库发生变化时自动完成缓存同步

使用事件监听机制可以做出一个简单的配置中心

设计思路

  1. 连接zookeeper 服务器
  2. 读取zookeeper中的配置信息,注册watcher监听器,存入本地变量
  3. zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
  4. 重新获取配置信息

分布式唯一id

在过去的单库单表型系统中,通常第可以使用数据库字段自带的auto_ increment属性来自动为每条记录生成个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_ increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID,通过zookeeper创建持久顺序节点,返回该节点序号,即为新id,然后将比自己小的节点删除。

@RequestMapping("/getZookeeperId")
	@ApiOperation("zookeeper测试---分布式唯一id")
	@ApiOperationSupport(author = "lsx",order = 12)
	public String getZookeeperId(){
		TreeSet<String> sortNode = new TreeSet<>();
		//唯一id
		String maxId = "";
		try {
			client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/seq/id-");
			List<String> forPath = client.getChildren().forPath("/seq");
			forPath.forEach(s->{
				String id = s.split("-")[1];
				sortNode.add(id);
			});
			String minId = sortNode.first();
			client.delete().forPath("/seq/id-"+minId);
			maxId= sortNode.last();
		} catch (Exception e) {
			e.printStackTrace();
			return "分布式id获取失败"+e.getMessage();
		}
		return maxId;
	}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐