AQS与重入锁ReetrantLock原理

摘要:
AQS内部通过一个volatileint类型的成员变量state控制同步状态,通过内部类Node构成FIFO的同步队列实现等待获取锁的线程排队工作,通过内部类ConditionObject构建条件等待队列,来完成等待条件线程的排队工作。如ReentrantLock通过内部类Sync及其子类继承AQS实现tryAcuire()和tryRelease()方法来实现独占锁,而SemaPhore则通过内部类继承AQS实现tryAcquireShared()方法和tryReleaseShared()方法实现共享模式锁。

一、AQS原理

AQS(AbstractQueuedSynchronizer)队列同步器是用来构建锁、同步组件的基础框架。

AQS内部通过一个volatile int类型的成员变量state控制同步状态【0代表锁未被占用,1表示已占用】,通过内部类Node构成FIFO的同步队列实现等待获取锁的线程排队工作,通过内部类ConditionObject构建条件等待队列,来完成等待条件线程的排队工作。当线程调用Condition对象的wait方法后会被加入等待队列中,当有线程调用Condition的signal方法后,线程将从等待队列移动到同步队列进行锁竞争。AQS内部会有一个同步队列和可能多个等待队列,前者存放等待获取锁的线程,后者分别存放等待不同条件的线程。

public abstract classAbstractQueuedSynchronizer
    extendsAbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatileNode head;
//指向同步的队尾
private transient volatileNode tail;
//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile intstate;
static final classNode {
    //共享模式
    static final Node SHARED = newNode();
    //独占模式
    static final Node EXCLUSIVE = null;
    //标识线程已处于结束状态
    static final int CANCELLED =  1;
    //等待被唤醒状态
    static final int SIGNAL    = -1;
    //条件状态,
    static final int CONDITION = -2;
    //在共享模式中使用表示获得的同步状态会被传播
    static final int PROPAGATE = -3;
    //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
    volatile intwaitStatus;
    //同步队列中前驱结点
    volatileNode prev;
    //同步队列中后继结点
    volatileNode next;
    //请求锁的线程
    volatileThread thread;
    //等待队列中的后继结点,这个与Condition有关,稍后会分析
Node nextWaiter;
    //判断是否为共享模式
    final booleanisShared() {
        return nextWaiter ==SHARED;
    }
    //获取前驱结点
    final Node predecessor() throwsNullPointerException {
        Node p =prev;
        if (p == null)
            throw newNullPointerException();
        else
            returnp;
    }
    //.....
}
//上面Node head、tail对象构建同步队列,这里用ConditionObject类的对象创建条件等待队列
class ConditionObject implementsCondition, java.io.Serializable { //等待队列第一个等待结点 private transientNode firstWaiter; //等待队列最后一个等待结点 private transientNode lastWaiter; //省略其他代码....... } //AQS中的模板方法,由其子类实现 //独占模式下获取锁的方法 protected boolean tryAcquire(intarg) { throw newUnsupportedOperationException(); } //独占模式下解锁的方法 protected boolean tryRelease(intarg) { throw newUnsupportedOperationException(); } //共享模式下获取锁的方法 protected int tryAcquireShared(intarg) { throw newUnsupportedOperationException(); } //共享模式下解锁的方法 protected boolean tryReleaseShared(intarg) { throw newUnsupportedOperationException(); } //判断是否为持有独占锁 protected booleanisHeldExclusively() { throw newUnsupportedOperationException(); } }

二、AQS的应用

AQS通过state状态管理、同步队列、等待队列实现了多线程同步锁获取与释放,多线程并发排队、条件等待等复杂功能。

作为基础组件,它对锁的两种模式【独占模式和共享模式】都提供支持。设计上AQS采用模板方法模式构建,其内部提供了并发操作的核心方法、而将一些实现不同模式下实现可能有差异的操作定义为模板方法,让其子类实现。如ReentrantLock通过内部类Sync及其子类继承AQS实现tryAcuire()和tryRelease()方法来实现独占锁,而SemaPhore则通过内部类继承AQS实现tryAcquireShared()方法和tryReleaseShared()方法实现共享模式锁。AQS的继承关系图如下:

AQS与重入锁ReetrantLock原理第1张

三、ReetrantLock非公平锁实现分析AQS的用法

ReetrantLock中非公平锁
//加锁操作
public voidlock() {
     sync.lock();
}
/**
 * 非公平锁实现sync.lock()
 */
static final class NonfairSync extendsSync {
    //加锁
    final voidlock() {
        //执行CAS操作,获取同步状态
        if (compareAndSetState(0, 1))
       //成功则将独占锁线程设置为当前线程  
setExclusiveOwnerThread(Thread.currentThread());
        else
            //否则再次请求同步状态
            acquire(1);
    }
}
//acquire为AQS本身实现的方法,其实现如下:
public final void acquire(intarg) {
    //再次尝试获取同步状态
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//AQS子类Sync的子类NonfairSync中实现的tryAcquire方法
    protected final boolean tryAcquire(intacquires) {
         returnnonfairTryAcquire(acquires);
     }
  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(intacquires) {
      final Thread current =Thread.currentThread();
      int c =getState();
      //判断同步状态是否为0,并尝试再次获取同步状态
      if (c == 0) {
          //执行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
      else if (current ==getExclusiveOwnerThread()) {
          int nextc = c +acquires;
          if (nextc < 0) //overflow
              throw new Error("Maximum lock count exceeded");
          //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
setState(nextc);
          return true;
      }
      return false;
  }
//AQS本身实现的方法,将当前获取锁失败的线程构造成node结点加入的同步队列尾部,若队列为空或者并发入队失败,则调用enq方法重试。
privateNode addWaiter(Node mode) {
    //将请求同步状态失败的线程封装成结点
    Node node = newNode(Thread.currentThread(), mode);
    Node pred =tail;
    //如果是第一个结点加入肯定为空,跳过。
    //如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
    if (pred != null) {
        node.prev =pred;
        //使用CAS执行尾部结点替换,尝试在尾部快速添加
        if(compareAndSetTail(pred, node)) {
            pred.next =node;
            returnnode;
        }
    }
    //如果第一次加入或者CAS操作没有成功执行enq入队操作
enq(node);
    returnnode;
}
//AQS本身实现方法,通过循环CAS操作将当前线程构造的node结点入队,解决上面队列为空或者是并发入队失败的情况;
private Node enq(finalNode node) {
    //死循环
    for(;;) {
         Node t =tail;
         //如果队列为null,即没有头结点
         if (t == null) { //Must initialize
             //创建并使用CAS设置头结点
             if (compareAndSetHead(newNode()))
                 tail =head;
         } else {//队尾添加新结点
             node.prev =t;
             if(compareAndSetTail(t, node)) {
                 t.next =node;
                 returnt;
             }
         }
     }
    }
//AQS本身方法,队列中结点循环观察,当自己的前驱是head结点执行的结点时尝试获取锁,若成功将其设置为头结点,否则循环尝试;若当前结点前驱结点不是头结点,则在设置其前驱结点状态后将自己挂起
final boolean acquireQueued(final Node node, intarg) {
    boolean failed = true;
    try{
        boolean interrupted = false;
        //自旋,死循环
        for(;;) {
            //获取前驱结点
            final Node p =node.predecessor();
            当且仅当p为头结点才尝试获取同步状态
            if (p == head &&tryAcquire(arg)) {
                //将node设置为头结点
setHead(node);
                //清空原来头结点的引用便于GC
                p.next = null; //help GC
                failed = false;
                returninterrupted;
            }
            //如果前驱结点不是head,判断是否挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally{
        if(failed)
            //最终都没能获取同步状态,结束该线程的请求
cancelAcquire(node);
    }
}
//设置为头结点
private voidsetHead(Node node) {
        head =node;
        //清空结点数据
        node.thread = null;
        node.prev = null;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
      interrupted = true;
}
//AQS本身的方法,若前驱结点状态为Node.SIGNAL则返回true,表示可以挂起当前结点,否则找到非结束状态的前驱结点,并设置其状态后,返回false
private static booleanshouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前结点的等待状态
        int ws =pred.waitStatus;
        //如果为等待唤醒(SIGNAL)状态则返回true
        if (ws ==Node.SIGNAL)
            return true;
        //如果ws>0 则说明是结束状态,
        //遍历前驱结点直到找到没有结束状态的结点
        if (ws > 0) {
            do{
                node.prev = pred =pred.prev;
            } while (pred.waitStatus > 0);
            pred.next =node;
        } else{
            //如果ws小于0又不是SIGNAL状态,
            //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//AQS本身方法,挂起当前线程并检查其中断状态
private final booleanparkAndCheckInterrupt() {
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        //并非中断线程,因此可能true也可能false,并返回
        returnThread.interrupted();
}
//ReentrantLock类的unlock
public voidunlock() {
    sync.release(1);
}
//AQS类的release()方法
public final boolean release(intarg) {
    //尝试释放锁
    if(tryRelease(arg)) {
        Node h =head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继结点的线程
unparkSuccessor(h);
        return true;
    }
    return false;
}
//ReentrantLock类中的内部类Sync实现的tryRelease(int releases) 
protected final boolean tryRelease(intreleases) {
      int c = getState() -releases;
      if (Thread.currentThread() !=getExclusiveOwnerThread())
          throw newIllegalMonitorStateException();
      boolean free = false;
      //判断状态是否为0,如果是则说明已释放同步状态
      if (c == 0) {
          free = true;
          //设置Owner为null
          setExclusiveOwnerThread(null);
      }
      //设置更新同步状态
setState(c);
      returnfree;
  }
//AQS本身方法,唤醒后续挂起的结点
private voidunparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws =node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t =t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s =t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理

免责声明:文章转载自《AQS与重入锁ReetrantLock原理》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇oracle--clob[转载]用 grub2 启动 clover.iso 来启动 OS X下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C#多线程和异步(一)——基本概念和使用方法

阅读目录 一、多线程相关的基本概念 二、C#中的线程使用 2.1  基本使用 2.2  常用的属性和方法 2.3  线程同步 2.4  跨线程访问 回到顶部 一、多线程相关的基本概念 进程(Process):是系统中的一个基本概念。 一个正在运行的应用程序在操作系统中被视为一个进程,包含着一个运行程序所需要的资源,进程可以包括一个或多...

c++ windows下计时

多核时代不宜再用 x86 的 RDTSC 指令测试指令周期和时间 陈硕Blog.csdn.net/Solstice 自从 Intel Pentium 加入 RDTSC 指令以来,这条指令是 micro-benchmarking 的利器,可以以极小的代价获得高精度的 CPU 时钟周期数(Time Stamp Counter),不少介绍优化的文章[1]和书籍用...

Linux 线程占用CPU过高定位分析

今天朋友问我一个Linux程序CPU占用涨停了,该如何分析, CPU占用过高,模拟CPU占用过高的情况 先上一段代码: 1 #include <iostream> 2 #include <thread> 3 #include <vector> 4 5 6 int main(int argc, char *...

操作系统复习知识

一、进程和线程 进程和线程的区别: 进程是一个正在执行中的程序,包括程序计数器、寄存器和变量的当前值;一个进程包含一个或多个线程。 进程是操作系统分配资源的最小单位;而线程是作为独立运行和CPU调度的基本单位。 进程间的资源是独立的,而同一进程的各线程间资源是共享的;进程有自己的独立地址空间,每启动一个进程,系统就会为它分配地址空间、建立数据表来维护...

读写锁

(1) 读写锁是几把锁   一把锁   pthread_rwlock_t lock; (2) 读写锁的类型   读锁: 对内存做读操作   写锁: 对内存做写操作 (3) 读写锁的特性:   线程A加读锁成功, 又来了三个线程, 做读操作, 可以加锁成功     读共享, 并行处理   线程A加写锁成功, 又来了三个线程, 做读操作, 三个线程阻塞    ...

堆栈信息分析

一、堆栈信息解读 2013-01-13 11:02:31 Full thread dump Java HotSpot(TM) Client VM (23.1-b03 mixed mode, sharing): "[ThreadPool Manager] - Idle Thread" daemon prio=6 tid=0x069a3400 nid=0x84...