菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
204
0

06.Zookeeper实现分布式锁

原创
05/13 14:22
阅读数 96521

分布式锁

分布式锁的思路是每个客户端都在某个目录下注册一个临时有序节点,每次最小的节点会获取锁,当前节点会去监听上一个较小节点,如果较小节点失效之后,就会去获取锁。

image-20210320111311093

java原生zookeeper客户端

  1. 引入jar包

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.7</version>
            </dependency>
    
  2. 创建ZK客户端连接单例

    package com.king.zookeeper;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperClient {
        //zk集群地址
        public static final String ZOOKEEPER_CONNECT="192.168.232.128:12181,192.168.232.128:12182,192.168.232.128:12183,192.168.232.128:12184";
        //计数器,用于等待连接成功
        //CountDownLatch是因为连接时会耗时较长,所以需要添加一个计数器进行阻塞,否则会在connecting阶段就被释放了
        public static CountDownLatch countDownLatch = new CountDownLatch(1);
        //连接超时时间
        public static final int SESSION_TIMEOUT = 5000;
        //用volatile修饰单例,防止赋值时发生指令重排
        private volatile static ZooKeeper instance;
        //用Double check获取单例
        public static ZooKeeper getInstance() throws IOException, InterruptedException {
            if (instance == null ){
                synchronized (ZookeeperClient.class) {
                    if (instance == null) {
                        //连接时注册一个监听,监听连接状态变化
                        instance = new ZooKeeper(ZOOKEEPER_CONNECT, SESSION_TIMEOUT, new Watcher() {
                            //监听回调方法
                            @Override
                            public void process(WatchedEvent watchedEvent) {
                                //当连接状态变成connected,就说明连接成功
                                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                                    countDownLatch.countDown();
                                }
                            }
                        });
                        //等待连接成功
                        countDownLatch.await();
                    }
                }
            }
            return instance;
        }
    
        public static int getSessionTimeout() {
            return SESSION_TIMEOUT;
        }
    }
    
  3. 创建分布式锁客户端

package com.king.zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DistibutedLock {
    //根目录,客户端都会去此目录下创建临时有序子节点
    //需要在zookeeper集群中有此目录
    private final String ROOT_PATH = "/apilock";
    //客户端
    private ZooKeeper zookeeper;
    //session超时时间
    private  int SESSION_TIMEOUT;
    //当前客户端创建有序节点的名称
    private String lockId;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public DistibutedLock() throws IOException, InterruptedException {
        this.zookeeper =ZookeeperClient.getInstance();
        this.SESSION_TIMEOUT = ZookeeperClient.getSessionTimeout();
    }

    public boolean lock(){

        try {
            //创建临时有序子节点
            lockId = zookeeper.create(ROOT_PATH+"/","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName()+"创建节点"+lockId+",开始竞争锁");
            //获取/lock目录下所有子节点
            List<String> children = zookeeper.getChildren(ROOT_PATH, true);
            //用SortedSet对子节点从小到大进行排序
            SortedSet<String> sortedSet = new TreeSet<String>();
            for (String child : children) {
                sortedSet.add(ROOT_PATH+"/"+child);
            }
            //获取最小节点名称
            String first = sortedSet.first();
            //如果当前创建节点就是最小节点,则获取锁
            if (first.equals(lockId)) {
                System.out.println(Thread.currentThread().getName()+"获取锁"+lockId);
                return true;
            }
            //获取比当前id小的节点集合
            SortedSet<String> frontSet = sortedSet.headSet(lockId);
            if (!frontSet.isEmpty()) {
                //取集合中最后一个元素,也就是临近最小节点
                String last = frontSet.last();
                System.out.println(lockId+"监听"+last);
                //当前节点去监听上一个节点,当上一个节点被删除的时候
                //当前节点就可以获取锁
                zookeeper.exists(last, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                            countDownLatch.countDown();
                        }
                    }
                });
                countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
            }
            return true;

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return false;
    }

    //释放锁
    public boolean unLock(){
        try {
            System.out.println(Thread.currentThread().getName() + "开始删除锁" + lockId);
            //删除当前节点
            zookeeper.delete(lockId, -1);
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }
}
  1. 测试代码

    package com.king.zookeeper;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    
    public class TestJavaApi {
        public static void main(String[] args) {
            //等待器,当所有线程都执行到某个步骤才停止阻塞
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
            //模拟十个线程去获取锁
            for (int i = 0; i < 10; i++) {
                new Thread(()-> {
                    DistibutedLock lock = null;
                    try {
                        lock = new DistibutedLock();
                        cyclicBarrier.await();
                        lock.lock();
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        if(lock!=null){
                            lock.unLock();
                        }
                    }
                }).start();
            }
        }
    }
    
    

ZKClient实现分布式锁

  1. 导入jar

            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.11</version>
            </dependency>
    
  2. 创建ZK客户端连接单例

    public class ZKClientInstance {
    
        public static final String ZOOKEEPER_CONNECT="192.168.232.128:12181,192.168.232.128:12182,192.168.232.128:12183,192.168.232.128:12184";
    
        private volatile static ZkClient instance;
    
        public static ZkClient getInstance(){
            if (instance == null) {
                synchronized (ZKClientInstance.class) {
                    if (instance == null) {
                        instance = new ZkClient(ZOOKEEPER_CONNECT,5000,
                                5000,new SerializableSerializer());
                    }
                }
            }
            return instance;
        }
    
    }
    
    
  3. 创建分布式锁客户端

    package com.king.zookeeper;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.util.List;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.CountDownLatch;
    
    public class ZKClientDisLock {
    
        private static final String ROOT_PATH = "/apilock";
    
        private ZkClient zkClient;
    
        private CountDownLatch countDownLatch = new CountDownLatch(1);
    
        private String lockId;
    
        public ZKClientDisLock(ZkClient zkClient) {
            this.zkClient = zkClient;
        }
    
        public boolean lock(){
    
    
            lockId = zkClient.createEphemeralSequential(ROOT_PATH + "/", "123");
    
            List<String> children = zkClient.getChildren(ROOT_PATH);
            SortedSet<String> sortedSet = new TreeSet<String>();
    
            for (String child : children) {
                sortedSet.add(ROOT_PATH+"/"+child);
            }
            String first = sortedSet.first();
            if (first.equals(lockId)) {
                System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
                return true;
            }
    
            SortedSet<String> frontSet = sortedSet.headSet(lockId);
            if (null != frontSet && frontSet.size() > 0) {
                String last = frontSet.last();
                IZkDataListener iZkDataListener = null;
                try {
                    System.out.println(lockId + "监听" + last + "节点变化");
                    iZkDataListener = new IZkDataListener() {
                        @Override
                        public void handleDataChange(String s, Object o) throws Exception {
                        }
    
                        @Override
                        public void handleDataDeleted(String s) throws Exception {
                            countDownLatch.countDown();
                        }
                    };
                    zkClient.subscribeDataChanges(last, iZkDataListener);
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
                } catch (Exception e) {
    
                }finally {
                    zkClient.unsubscribeDataChanges(last,iZkDataListener);
                }
                return true;
    
            }
    
            return false;
        }
    
        public void unLock(){
            System.out.println(Thread.currentThread().getName()+ "释放锁"+ lockId + "-----");
            zkClient.delete(lockId);
        }
    }
    
    
  4. 测试

    package com.king.zookeeper;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    
    public class TestZkClient {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                new Thread(()-> {
                    ZKClientDisLock lock = null;
                    try {
                        lock = new ZKClientDisLock(ZKClientInstance.getInstance());
                        cyclicBarrier.await();
                        lock.lock();
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
                    }  catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        if(lock!=null){
                            lock.unLock();
                        }
                    }
                }).start();
            }
    }}
    
    

发表评论

0/200
204 点赞
0 评论
收藏