ZooKeeper入门教程三分布式锁实现及完整运行源码

目录
  • 1.0版本
  • 2.0版本
  • LockSample类
    • 构造方法
    • 获取锁实现
      • createLock()
      • attemptLock()
    • 释放锁实现
    •  TicketSeller类
      • sell()
        • sellTicketWithLock()
          • 测试入口
            • 测试方法
            • 代码清单如下:
              • 1、LockSample
                • 2、TicketSeller

                ZooKeeper入门教程一简介与核心概念

                ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用

                1.0版本

                首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

                用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。

                • 所有客户端争相创建此节点,但只有一个客户端创建成功。
                • 创建成功代表获取锁成功,此客户端执行业务逻辑
                • 未创建成功的客户端,监听/exlusive_lock变更
                • 获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放
                • 锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。

                我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。

                上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:

                1、锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。

                2、羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。

                为了解决上面的问题,我们重新设计。

                2.0版本

                我们在2.0版本中,让每个客户端在/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:

                • 每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
                • 客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
                • 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
                • 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
                • 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

                如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。

                接下来我们看看代码如何实现。

                LockSample类

                此类是分布式锁类,实现了2个分布式锁的相关方法:

                1、获取锁

                2、释放锁

                主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:

                private ZooKeeper zkClient;
                private static final String LOCK_ROOT_PATH = "/Locks";
                private static final String LOCK_NODE_NAME = "Lock_";
                private String lockPath;

                定义了zkClient,用来操作zookeeper。

                锁的根路径,及自增节点的前缀。此处生产环境应该由客户端传入。

                当前锁的路径。

                构造方法

                public LockSample() throws IOException {
                    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if(event.getState()== Event.KeeperState.Disconnected){
                                System.out.println("失去连接");
                 
                            }
                        }
                    });
                }

                创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。

                获取锁实现

                暴露出来的获取锁的方法为acquireLock(),逻辑很简单:

                public  void acquireLock() throws InterruptedException, KeeperException {
                    //创建锁节点
                    createLock();
                    //尝试获取锁
                    attemptLock();
                }

                首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。

                createLock()

                先判断锁的根节点/Locks是否存在,不存在的话创建。然后在/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。

                代码如下:

                private void createLock() throws KeeperException, InterruptedException {
                    //如果根节点不存在,则创建根节点
                    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
                    if (stat == null) {
                        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                 
                    // 创建EPHEMERAL_SEQUENTIAL类型节点
                    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.EPHEMERAL_SEQUENTIAL);
                    System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
                    this.lockPath=lockPath;
                }

                attemptLock()

                这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:

                private void attemptLock() throws KeeperException, InterruptedException {
                    // 获取Lock所有子节点,按照节点序号排序
                    List<String> lockPaths = null;
                    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
                    Collections.sort(lockPaths);
                    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
                    // 如果lockPath是序号最小的节点,则获取锁
                    if (index == 0) {
                        System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
                        return ;
                    } else {
                        // lockPath不是序号最小的节点,监听前一个节点
                        String preLockPath = lockPaths.get(index - 1);
                 
                        Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
                 
                        // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
                        if (stat == null) {
                            attemptLock();
                        } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                            System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
                            synchronized (watcher) {
                                watcher.wait();
                            }
                            attemptLock();
                        }
                    }
                }

                注意这一行代码

                Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

                我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程。

                watcher定义代码如下:

                private Watcher watcher = new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println(event.getPath() + " 前锁释放");
                        synchronized (this) {
                            notifyAll();
                        }
                    }
                };

                watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。

                释放锁实现

                释放锁原语实现很简单,参照releaseLock()方法。代码如下:

                public void releaseLock() throws KeeperException, InterruptedException {
                    zkClient.delete(lockPath, -1);
                    zkClient.close();
                    System.out.println(" 锁释放:" + lockPath);
                }

                关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。

                我们创建一个TicketSeller类,作为客户端来使用分布式锁。

                 TicketSeller类

                sell()

                不带锁的业务逻辑方法,代码如下:

                private void sell(){
                    System.out.println("售票开始");
                    // 线程随机休眠数毫秒,模拟现实中的费时操作
                    int sleepMillis = (int) (Math.random() * 2000);
                    try {
                        //代表复杂逻辑执行了一段时间
                        Thread.sleep(sleepMillis);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("售票结束");
                }

                仅是为了演示,sleep了一段时间。

                sellTicketWithLock()

                此方法中,加锁后执行业务逻辑,代码如下:

                public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
                    LockSample lock = new LockSample();
                    lock.acquireLock();
                    sell();
                    lock.releaseLock();
                }

                测试入口

                接下来我们写一个main函数做测试:

                public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
                    TicketSeller ticketSeller = new TicketSeller();
                    for(int i=0;i<1000;i++){
                        ticketSeller.sellTicketWithLock();
                    }
                }

                main函数中我们循环调用ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。

                测试方法

                1、先启动一个java程序运行,可以看到日志输出如下:

                main 锁创建: /Locks/Lock_0000000391
                main 锁获得, lockPath: /Locks/Lock_0000000391
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000391
                main 锁创建: /Locks/Lock_0000000392
                main 锁获得, lockPath: /Locks/Lock_0000000392
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000392
                main 锁创建: /Locks/Lock_0000000393
                main 锁获得, lockPath: /Locks/Lock_0000000393
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000393

                可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。

                2、我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:

                程序1:

                main 锁获得, lockPath: /Locks/Lock_0000000471
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000471
                main 锁创建: /Locks/Lock_0000000473
                 等待前锁释放,prelocakPath:Lock_0000000472
                /Locks/Lock_0000000472 前锁释放
                main 锁获得, lockPath: /Locks/Lock_0000000473
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000473

                可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。

                我们再看程序2的日志:

                main 锁获得, lockPath: /Locks/Lock_0000000472
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000472
                main 锁创建: /Locks/Lock_0000000474
                 等待前锁释放,prelocakPath:Lock_0000000473
                /Locks/Lock_0000000473 前锁释放
                main 锁获得, lockPath: /Locks/Lock_0000000474
                售票开始
                售票结束
                 锁释放:/Locks/Lock_0000000474

                可以看到,确实是进程2获取了Lock_0000000472。

                zookeeper实现分布式锁就先讲到这。注意代码只做演示用,并不适合生产环境使用。

                代码清单如下:

                1、LockSample

                import org.apache.zookeeper.*;
                import org.apache.zookeeper.data.Stat;
                 
                import java.io.IOException;
                import java.util.Collections;
                import java.util.List;
                 
                public class LockSample {
                 
                    //ZooKeeper配置信息
                    private ZooKeeper zkClient;
                    private static final String LOCK_ROOT_PATH = "/Locks";
                    private static final String LOCK_NODE_NAME = "Lock_";
                    private String lockPath;
                 
                    // 监控lockPath的前一个节点的watcher
                    private Watcher watcher = new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            System.out.println(event.getPath() + " 前锁释放");
                            synchronized (this) {
                                notifyAll();
                            }
                 
                        }
                    };
                 
                    public LockSample() throws IOException {
                        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
                            @Override
                            public void process(WatchedEvent event) {
                                if(event.getState()== Event.KeeperState.Disconnected){
                                    System.out.println("失去连接");
                 
                                }
                            }
                        });
                    }
                 
                    //获取锁的原语实现.
                    public  void acquireLock() throws InterruptedException, KeeperException {
                        //创建锁节点
                        createLock();
                        //尝试获取锁
                        attemptLock();
                    }
                 
                    //创建锁的原语实现。在lock节点下创建该线程的锁节点
                    private void createLock() throws KeeperException, InterruptedException {
                        //如果根节点不存在,则创建根节点
                        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
                        if (stat == null) {
                            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        }
                 
                        // 创建EPHEMERAL_SEQUENTIAL类型节点
                        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.EPHEMERAL_SEQUENTIAL);
                        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
                        this.lockPath=lockPath;
                    }
                 
                    private void attemptLock() throws KeeperException, InterruptedException {
                        // 获取Lock所有子节点,按照节点序号排序
                        List<String> lockPaths = null;
                 
                        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
                 
                        Collections.sort(lockPaths);
                 
                        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
                 
                        // 如果lockPath是序号最小的节点,则获取锁
                        if (index == 0) {
                            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
                            return ;
                        } else {
                            // lockPath不是序号最小的节点,监控前一个节点
                            String preLockPath = lockPaths.get(index - 1);
                 
                            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
                 
                            // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
                            if (stat == null) {
                                attemptLock();
                            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                                System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
                                synchronized (watcher) {
                                    watcher.wait();
                                }
                                attemptLock();
                            }
                        }
                    }
                 
                    //释放锁的原语实现
                    public void releaseLock() throws KeeperException, InterruptedException {
                        zkClient.delete(lockPath, -1);
                        zkClient.close();
                        System.out.println(" 锁释放:" + lockPath);
                    }
                 
                 
                }

                2、TicketSeller

                import org.apache.zookeeper.KeeperException;
                import java.io.IOException;
                public class TicketSeller {
                    private void sell(){
                        System.out.println("售票开始");
                        // 线程随机休眠数毫秒,模拟现实中的费时操作
                        int sleepMillis = (int) (Math.random() * 2000);
                        try {
                            //代表复杂逻辑执行了一段时间
                            Thread.sleep(sleepMillis);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("售票结束");
                    }
                 
                    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
                        LockSample lock = new LockSample();
                        lock.acquireLock();
                        sell();
                        lock.releaseLock();
                    }
                 
                    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
                        TicketSeller ticketSeller = new TicketSeller();
                        for(int i=0;i<1000;i++){
                            ticketSeller.sellTicketWithLock();
                 
                        }
                    }
                }

                以上就是ZooKeeper入门教程三分布式锁实现及完整运行源码的详细内容,更多关于ZooKeeper分布式锁实现源码的资料请关注其它相关文章!

                本文转自网络,如有侵权请联系客服删除。