zookeeper(五) curator 锁机制

摘要:
分布式锁的应用分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了,那么就会导致其他服务器无法再获取该锁.就会造成死锁问题,在Curator中,锁的信息都是保存在临时节点上,如果持有锁资源的服务器宕机,那么ZooKeeper就会移除它的信息,这时其他服务器就能进行获取锁操作。

分布式锁的应用

分布式锁服务宕机, ZooKeeper 一般是以集群部署, 如果出现 ZooKeeper 宕机, 那么只要当前正常的服务器超过集群的半数, 依然可以正常提供服务
持有锁资源服务器宕机, 假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成 死锁 问题, 在 Curator 中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么 ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。

当然了分布式锁还可以基于redis实现,其string类型的 setnx key value命令 结合expire命令。

前面的准备工作:

复制代码
package Lock;

import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DistributedLockDemo {
    // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
    private final String lockPath = "/distributed-lock";

    // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
    // 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
    private String connectString;
    // Curator 客户端重试策略
    private RetryPolicy retry;
    // Curator 客户端对象
    private CuratorFramework client;
    // client2 用户模拟其他客户端
    private CuratorFramework client2;

    // 初始化资源
    @Before
    public void init() throws Exception {
        // 设置 ZooKeeper 服务地址为本机的 2181 端口
        connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        // 重试策略
        // 初始休眠时间为 1000ms, 最大重试次数为 3
        retry = new ExponentialBackoffRetry(1000, 3);
        // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        // 创建会话
        client.start();
        client2.start();
    }

    // 释放资源
    @After
    public void close() {
        CloseableUtils.closeQuietly(client);
    }
}
复制代码

zookeper的实现主要有下面四类类:

InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器

1.共享锁,不可重入--- InterProcessSemaphoreMutex

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁。

复制代码
    @Test
    public void sharedLock() throws Exception {
        // 创建共享锁
        final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
        // lock2 用于模拟其他客户端
        final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock.acquire();
                    System.out.println("1获取锁===============");
                    // 测试锁重入
                    Thread.sleep(5 * 1000);
                    lock.release();
                    System.out.println("1释放锁===============");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("2获取锁===============");
                    Thread.sleep(5 * 1000);
                    lock2.release();
                    System.out.println("2释放锁===============");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Thread.sleep(20 * 1000);
    }
复制代码

2.共享可重入锁--- InterProcessMutex

此锁可以重入,但是重入几次需要释放几次。

复制代码
    @Test
    public void sharedReentrantLock() throws Exception {
        // 创建共享锁
        final InterProcessLock lock = new InterProcessMutex(client, lockPath);
        // lock2 用于模拟其他客户端
        final InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock.acquire();
                    System.out.println("1获取锁===============");
                    // 测试锁重入
                    lock.acquire();
                    System.out.println("1再次获取锁===============");
                    Thread.sleep(5 * 1000);
                    lock.release();
                    System.out.println("1释放锁===============");
                    lock.release();
                    System.out.println("1再次释放锁===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("2获取锁===============");
                    // 测试锁重入
                    lock2.acquire();
                    System.out.println("2再次获取锁===============");
                    Thread.sleep(5 * 1000);
                    lock2.release();
                    System.out.println("2释放锁===============");
                    lock2.release();
                    System.out.println("2再次释放锁===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        countDownLatch.await();
    }
复制代码

结果:

1获取锁===============
1再次获取锁===============
1释放锁===============
1再次释放锁===============
2获取锁===============
2再次获取锁===============
2释放锁===============
2再次释放锁===============

原理:

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid + 递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

3.共享可重入读写锁

读锁和读锁不互斥,只要有写锁就互斥。

复制代码
    @Test
    public void sharedReentrantReadWriteLock() throws Exception {
        // 创建共享可重入读写锁
        final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, lockPath);
        // lock2 用于模拟其他客户端
        final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);

        // 获取读写锁(使用 InterProcessMutex 实现, 所以是可以重入的)
        final InterProcessLock readLock = locl1.readLock();
        final InterProcessLock readLockw = lock2.readLock();

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    readLock.acquire();
                    System.out.println("1获取读锁===============");
                    // 测试锁重入
                    readLock.acquire();
                    System.out.println("1再次获取读锁===============");
                    Thread.sleep(5 * 1000);
                    readLock.release();
                    System.out.println("1释放读锁===============");
                    readLock.release();
                    System.out.println("1再次释放读锁===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    Thread.sleep(500);
                    readLockw.acquire();
                    System.out.println("2获取读锁===============");
                    // 测试锁重入
                    readLockw.acquire();
                    System.out.println("2再次获取读锁==============");
                    Thread.sleep(5 * 1000);
                    readLockw.release();
                    System.out.println("2释放读锁===============");
                    readLockw.release();
                    System.out.println("2再次释放读锁===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        countDownLatch.await();
    }
复制代码

结果:

1获取读锁===============
1再次获取读锁===============
2获取读锁===============
2再次获取读锁==============
1释放读锁===============
2释放读锁===============
1再次释放读锁===============
2再次释放读锁===============

4. 共享信号量

复制代码
    @Test
    public void semaphore() throws Exception {
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 1);

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    logger.info("1获取读信号量===============");
                    Thread.sleep(5 * 1000);
                    semaphore.returnLease(lease);
                    logger.info("1释放读信号量===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    logger.info("2获取读信号量===============");
                    Thread.sleep(5 * 1000);
                    semaphore.returnLease(lease);
                    logger.info("2释放读信号量===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        countDownLatch.await();
    }
复制代码

结果:

09:39:26 [Lock.DistributedLockDemo]-[INFO] 2获取读信号量===============
09:39:32 [Lock.DistributedLockDemo]-[INFO] 2释放读信号量===============
09:39:32 [Lock.DistributedLockDemo]-[INFO] 1获取读信号量===============
09:39:37 [Lock.DistributedLockDemo]-[INFO] 1释放读信号量===============

当然可以一次获取多个信号量:

复制代码
    @Test
    public void semaphore() throws Exception {
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 3);

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    // 获取2个许可
                    Collection<Lease> acquire = semaphore.acquire(2);
                    logger.info("1获取读信号量===============");
                    Thread.sleep(5 * 1000);
                    semaphore.returnAll(acquire);
                    logger.info("1释放读信号量===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    // 获取1个许可
                    Collection<Lease> acquire = semaphore.acquire(1);
                    logger.info("2获取读信号量===============");
                    Thread.sleep(5 * 1000);
                    semaphore.returnAll(acquire);
                    logger.info("2释放读信号量===============");

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        countDownLatch.await();
    }
复制代码

结果:

09:46:53 [Lock.DistributedLockDemo]-[INFO] 2获取读信号量===============
09:46:53 [Lock.DistributedLockDemo]-[INFO] 1获取读信号量===============
09:46:58 [Lock.DistributedLockDemo]-[INFO] 2释放读信号量===============
09:46:58 [Lock.DistributedLockDemo]-[INFO] 1释放读信号量===============

5.多重共享锁

复制代码
    @Test
    public void multiLock() throws Exception {
        // 可重入锁
        final InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath);
        // 不可重入锁
        final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath);
        // 创建多重锁对象
        final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));

        final CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    // 获取参数集合中的所有锁
                    lock.acquire();
                    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
                    System.out.println(lock.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock1 是可重入锁, 所以可以继续获取锁
                    System.out.println(interProcessLock1.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock2 是不可重入锁, 所以获取锁失败
                    System.out.println(interProcessLock2.acquire(2, TimeUnit.SECONDS));

                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        countDownLatch.await();
    }
复制代码

结果:

false

true

false

免责声明:文章转载自《zookeeper(五) curator 锁机制》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Apdex(应用性能指标)BTrace使用总结下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

app微信支付-java服务端接口 支付-查询-退款

个人不怎么看得懂微信的文档,看了很多前辈的写法,终于调通了,在这里做一下记录。 首先来定义各种处理类(微信支付不需要特殊jar包,很多处理需要自己封装,当然也可以自己写完打个jar包) 参数要用jdom解析   自行导入jar包    或者在maven pom.xml中导入  <dependency> <groupId>org...

【java开发系列】—— java输入输出流

前言   任何语言输入输出流都是很重要的部分,比如从一个文件读入内容,进行分析,或者输出到另一个文件等等,都需要文件流的操作。这里简单介绍下reader,wirter,inputstream,outputstream的使用方法。其实Apache commons里面有个方法IOUtils可是实现方便快捷的流拷贝,感兴趣的可以参考官方文档。   JAVA的...

Java 基础 AutoCloseable &amp;amp; Closeable

Overview Closeable和AutoCloseable都是接口,且都只定义了一个close()方法。Closeable: 定义于 java.io包中,于JDK5添加;AutoCloseable: 定义于java.lang包中, 于JDK7添加; AutoCloseable.java public interface AutoCloseable{...

让程序在崩溃时体面的退出转

转自 http://blog.csdn.net/starlee/article/details/6630816 让程序在崩溃时体面的退出之SEH  SEH的全称是Structured Exception Handling,是Windows操作系统提供的一种异常处理方式。SEH是属于操作系统的特性,不为特定语言设计,从它的名字就能看出它是一种结构化的异常处理...

文件的上传&amp;amp;预览&amp;amp;下载学习(一)

注:主要是说明后端逻辑和数据库表设计 1.当前主流的几种文件上传&预览&下载方式 把文件直接存储在服务器 分布式存储OSS,比如阿里OSS、Minio 2.数据库表设计 由于文件都是跟业务关联的,比如评论里面掺杂评论图片,常规的设计就是在'评论表'添加上传'图片名称'字段和'图片相对路径',在上传成功后返回给前端 1.1 如果是加入多个...

Linux进程间通信-信号量

        当多个进程表同一时候訪问系统上的某个资源的时候,比方同一时候写一个数据库的某条记录,或者同一时候改动某个文件,就须要考虑进城的同步问题,以确保任一时刻仅仅有一个进程能够拥有对资源的独占式訪问。 通常。程序对共享资源的訪问的代码仅仅是非常短的一段。你就是这一段代码引发了进程之间的竞态条件。我们称这段代码为关键代码段,或者临界区。    ...