一、Zookeeper实现分布式锁思路

1.1.分布式锁应具备哪些条件

分布式锁应该具备以下条件:

  1. 在分布式环境下,在一个时间点,最多被一个请求方持有
  2. 高可用、高性能的获取和释放
  3. 具备可重入性
  4. 具备失效机制,防止死锁
  5. 具备非阻塞特性,即没有获取到锁将直接返回失败

1.2.ZooKeeper实现分布式锁所具备的特性

1.2.1.高可用、高性能

  1. ZooKeeper的数据是内存读取的,本身就具备高性能特性
  2. ZooKeeper具备很好的故障恢复能力
    1. 在集群模式下,当Leader节点宕机后,当前集群会选出新的Leader处理请求
    2. ZooKeeper的ZAB协议保证了数据一致性问题

1.2.2.失效机制

ZooKeeper的有序临时节点,在客户端宕机或者断开后,会自动删除.这保证了不会因为环境问题来导致节点不会删除.

1.3.用ZooKeeper如何实现分布式锁

分布式锁本质,就是多个资源竞争者对一份资源的排他占有.

那么在ZooKeeper中我们可以使用临时有序节点来完成这项工作:

假设我们定义当前分布式锁的名称为lock,

我们在ZooKeeper中定义了一个根目录假设为: /dlock/locks

那么我们就可以在/dlock/locks/lock下创建临时有序节点,当多个竞争者都想获取这把锁时,就会出现以下情况:

[zk: 127.0.0.1:2181(CONNECTED) 85] ls /dlock/locks/lock
[lock0000000000, lock0000000001, lock0000000002, lock0000000003, lock0000000004, lock0000000005, lock0000000006, lock0000000007, lock0000000008, lock0000000009, lock0000000010, lock0000000011, lock0000000012, lock0000000013, lock0000000014, lock0000000015, lock0000000016, lock0000000017, lock0000000018, lock0000000019, lock0000000020, lock0000000021, lock0000000022, lock0000000023, lock0000000024, lock0000000025, lock0000000026, lock0000000027, lock0000000028, lock0000000029, lock0000000030, lock0000000031, lock0000000032, lock0000000033, lock0000000034, lock0000000035, lock0000000036, lock0000000037, lock0000000038, lock0000000039, lock0000000040, lock0000000041, lock0000000042, lock0000000043, lock0000000044, lock0000000045, lock0000000046, lock0000000047, lock0000000048, lock0000000049, lock0000000050, lock0000000051, lock0000000052, lock0000000053, lock0000000054, lock0000000055, lock0000000056, lock0000000057, lock0000000058, lock0000000059, lock0000000060, lock0000000061, lock0000000062, lock0000000063, lock0000000064, lock0000000065, lock0000000066, lock0000000067, lock0000000068, lock0000000069, lock0000000070, lock0000000071, lock0000000072, lock0000000073, lock0000000074, lock0000000075, lock0000000076, lock0000000077, lock0000000078, lock0000000079, lock0000000080, lock0000000081, lock0000000082, lock0000000083, lock0000000084, lock0000000085, lock0000000086, lock0000000087, lock0000000088, lock0000000089, lock0000000090, lock0000000091, lock0000000092, lock0000000093, lock0000000094, lock0000000095, lock0000000096, lock0000000097, lock0000000098, lock0000000099]

上面的例子我是用了100个线程并发竞争,则在/dlock/locks/lock下创建了100个临时有序节点.

怎么定义谁持有这把锁呢?

因为这些创建的节点是有序的,我们可以定义最小的那个节点就是锁的持有者.

怎么释放锁呢?

你可以主动删除你创建的临时节点

假如现在你的线程创建了节点:lock0000000000,那么你调用ZooKeeper.delete后删除该节点,可以认为是释放锁

释放后,别的竞争者怎么获取到当前锁呢?

我们可以利用ZooKeeper的watcher机制,但是我们不能监控通过getChildren监控/dlock/locks/lock节点变化,因为当lock0000000000被删除后,它会通知所有监控/dlock/locks/lock子节点变化的客户端,进而引发羊群效应,实际上这也是没有太大益处的,因为下一个持有锁的一定是最小的节点,而该节点一定是离lock0000000000节点最近的节点,因为/dlock/locks/lock下的节点是有序的.

在本例子中,我们只需要在lock0000000001节点上添加lock0000000000的watcher,可以通过exists的方式添加.

锁的重入,我们怎么判断?

我们可以在锁的内部维护一个int 变量入int state=1,当重入时,我们去对该变量加1,当释放锁时,判断state==1,如果true,删除当前节点,否则只对state减1

下面我们来实现一个demo级的ZooKeeper分布式锁.

二、ZooKeeper分布式锁代码

接口定义:

package com.xu.dlock.core;

public interface DLock {
    /**
     * 阻塞获取锁
     * @return
     */
    boolean lock();


    /**
     * 释放锁
     * @return
     */
    boolean unlock();

}



package com.xu.dlock.core;

public interface DLockFactory {

    /**
     * 拿到对象
     * @param key
     * @return
     */
    DLock getLock(String key);
}

获取锁的工厂实现:

@Setter
@Getter
public class ZkDLockFactory implements DLockFactory {


    private String DLOCK_ROOT_PATH="/dlock/locks";
    private ZooKeeper zooKeeper;
    
    public DLock getLock(String key) {
        String path = getPath(key);
        try {
            zooKeeper.create(path,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (Exception e) {

        }finally {
            try {
                Stat stat= zooKeeper.exists(path,false);
                if(stat == null){
                    return null;
                }
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
        DLock lock = new ZkLock(path,key,zooKeeper);
        return lock;
    }

    private String getPath(String key) {
        return DLOCK_ROOT_PATH+"/"+key;
    }

}

锁的实现:

package com.xu.dlock.core.lock;

import com.xu.dlock.core.DLock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class ZkLock implements DLock {
    
    private String path;

    private String name;

    private String lockPath ;

    private ZooKeeper zk;

    private int state ;

    public ZkLock(String path, String name, ZooKeeper zk) {
        this.path = path;
        this.name = name;
        this.zk = zk;
        this.state=0;
    }




    public boolean lock() {
       boolean flag= lockInternal();
       if(flag){
           state++;
       }
       return flag;
    }

    private  boolean lockInternal(){
        try {
            String result = zk.create(getPath(), "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            this.lockPath = result;
            List<String> waits = zk.getChildren(path, false);
            Collections.sort(waits);
            String[] paths=result.split("/");
            String curNodeName =  paths[paths.length-1];
            if (waits.get(0).equalsIgnoreCase(curNodeName)) {
                return true;
            }
            CountDownLatch latch = new CountDownLatch(1);
            for (int i = 0; i < waits.size(); i++) {
                String cur = waits.get(i);
                if (!cur.equalsIgnoreCase(curNodeName)) {
                    continue;
                }
                String prePath = path+"/"+waits.get(i - 1);
                zk.exists(prePath, new Watcher() {
                    public void process(WatchedEvent event) {
                        latch.countDown();
                    }
                });
                break;
            }
            latch.await();
            return true;
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return false;
    }

    private String getPath() {
        return path+"/"+name;
    }

    public boolean unlock() {
        if(state>1){
            state--;
            return true;
        }
        try {
            Stat stat=zk.exists(lockPath,false);
            int version= stat.getVersion();
            zk.delete(lockPath,version);
            state--;
            return true;
        } catch (Exception e) {
            System.out.println("unlock:"+lockPath+" ,exception,");
        }
        return false;
    }



}

测试代码:

package com.xu.dlock.test;

import com.xu.dlock.core.DLock;
import com.xu.dlock.core.factory.ZkDLockFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DLockTest {

    public final static Random random  = new Random();

    public static void main(String[] args) {
        ZooKeeper zk  = getZkClient();

        CountDownLatch latch = new CountDownLatch(100);
        ZkDLockFactory factory = new ZkDLockFactory();
        factory.setZooKeeper(zk);
        for(int i = 0;i<100;i++){
            int finalI = i;
            Thread t = new Thread(()->{
                exec(factory);
                System.out.println("Thread_"+ finalI +"释放锁完成");
                latch.countDown();
            },"Thread_"+i);
            t.start();
        }
        try {
            latch.await();
            TimeUnit.SECONDS.sleep(5);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("测试完成");
    }


    public static void exec(ZkDLockFactory factory){
        DLock lock=factory.getLock("lock");
        System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁");
        boolean flag=lock.lock();
        System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁,结果:"+flag);

        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(30));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.unlock();
        System.out.println("Thread:"+Thread.currentThread().getName()+",释放锁锁");


    }




    public static ZooKeeper getZkClient(){
        try {
            ZooKeeper zooKeeper = new ZooKeeper("192.168.56.101:2181", 200000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState() == Event.KeeperState.SyncConnected){
                        System.out.println("连接成功");
                    }
                }
            });
            return zooKeeper;
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }


}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐