ReentrantLock源码解析(一) – – 公平锁、非公平锁

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

ReentrantLock叫做可重入锁,什么是可重入呢?

可重入就是说如果当前线程持有锁,当它还没释放锁,它再次进行获取锁是可以继续获取到的,而不用等待释放锁再获取到。可重入锁分为公平锁和非公平锁,默认是非公平锁。下面先来看下它的基础结构图。


 继承结构





ReentrantLock源码解析(一)  - - 公平锁、非公平锁


ReentrantLock实现了Lock接口,Lock接口提供了应该实现的几个方法

  1. lock():获取锁

  2. lockInterruptibly(): 获取锁(可中断)

  3. tryLock():尝试获取锁,获取不到就返回false

  4. tryLock():尝试获取锁,获取不到就等一段时间,时间到了还没获取到就返回false

  5. unlock():释放锁

  6. newCondition():获取条件锁




主要内部类




先来看下都依赖了哪些内部类

ReentrantLock源码解析(一)  - - 公平锁、非公平锁


ReentrantLock主要就是通过Sync来完成获取锁和释放锁等操作,通过ReentrantLock构造方法来指定具体要使用公平锁还是非公平锁,通过上图可以看到,Sync是继承了AQS,AQS提供了核心的方法来完成真正意义上释放锁和阻塞锁等逻辑。

AQS在j.u.c包中为很多组件提供了底层的支持,例如CownDownLatch,CyclicBarrier,Semaphore,这些组件其中许多核心方法都依赖于AQS,通过先讲解ReentrantLock,往后再讲解其他组件,就容易理解的多,因为许多方法都是通用的。


构造方法



ReentrantLock源码解析(一)  - - 公平锁、非公平锁

通过构造方法可以看到默认是实现的非公平锁,如果要实现公平锁,参数传true即可。为什么默认是实现的非公平锁,而不是公平锁呢?带着疑问往下看,看完源码就知道为啥要这样做了。


公平锁lock方法



这里我们先讲解公平锁的lock方法是如何实现的

  public void lock() {
        sync.lock();
    }

  static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
 // AQS.acquire(int arg)
    public final void acquire(int arg) {
        // tryAcquire:尝试获取锁:成功返回true,失败返回false
        // addWaiter():将未获取到锁的线程放入队列
        // acquireQueued: 挂起当前线程
        //
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 再次设置中断标记位 true
            selfInterrupt();
    }


RenntrantLock.的lock方法会调用FairSync的lock方法,而真正实现获取锁的方法是AQS的acquire方法,这个方法主要做了三件事:

  1. 调用FairSync的tryAcquire方法来尝试获取锁。

  2. 如果获取不到就调用AQS的addWaiter将线程加入阻塞队列。

  3. 然后调用acquireQueued方法去不断自旋获取锁,获取不到就挂起,从这可以看到线程的阻塞挂起就是通过这个方法来实现。



下面来一一讲解这几个方法


FairSync的tryAcquire()方法



   /**
         * 尝试获取锁(包含重入过程)
         */

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // ==0表示当前AQS处于无锁状态
            if (c == 0) {
                // 公平锁所以要检查当前线程之前是否有等待的线程
                // !hasQueuedPredecessors():表示无等待者
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    //设置当前线程未独占者线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 执行到这表示 c!=0
            // 检查当前线程是否是独占锁的线程,因为ReentrantLock是可重入的
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // cas失败或者当前线程前面有等待的线程
            return false;
        }


  1. 首先获取当前state状态,这个state状态是啥,对于ReentrantLock来说,如果state==0,就代表当前是无锁状态,不等于就是锁已经被占用

  2. 如果state==0,此时还没有线程获取锁,就可以尝试获取锁。

  3. 首先通过hasQueuedPredecessors方法判断当前线程之前是否有等待的锁,没有才能去获取锁资源,有的话就不能获取,这个方法也就是和非公平锁获取锁的主要区别。

  4. 如果当前线程前面没有等待的锁就通过cas操作更新state值,更新成功就代表获取到锁,将当前线程设置为持有锁的线程。

  5. 如果当前state不等于0,代表有线程获取锁,那么判断当前线程是否是获取锁的线程,如果是,也可以获取锁,将state值加1,这个逻辑就是实现可重入的逻辑。


AQS的addWaiter()方法



调用这个方法的前提就是没有获取到锁,没有获取到锁就要把当前线程加入到阻塞队列。


  private Node addWaiter(Node mode) {
        // 构建node
        Node node = new Node(Thread.currentThread(), mode);
        // 获取队尾节点
        Node pred = tail;
        // 不为空表示队列中已经有Node了
        if (pred != null) {
            // 当前节点的prev指向pred->tail
            node.prev = pred;
            // cas成功,Node就入队成功
            if (compareAndSetTail(pred, node)) {

                pred.next = node;
                return node;
            }
        }
        //当前队列为空或者cas失败会调用enq方法

        // 自旋直到入队成功
        enq(node);
        return node;
    }

private Node enq(final Node node) {
        // 自旋入队
        for (; ; ) {
            Node t = tail;
            // 当前队列是空队列
            if (t == null) {
                // 作为当前持锁线程的第一个后继线程,需要做啥呢

                //1.因为当前持锁的线程,它获取锁时,直接tryAcquire成功了,没有向 阻塞队列 中添加任何node,所以作为后继需要为它擦屁股..
                //2.为自己追加node
                //CAS成功,说明当前线程 成为head.next节点。
                //线程需要为当前持锁的线程 创建head。
                if (compareAndSetHead(new Node()))
                    tail = head;
            }
            // 进入到这里表示当前队列不为空,则将当前node入队
            // 入不成功会接着循环直到入队成功
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


  1. 首先创建一个Node,如果当前队列不为空,表示队列已经有node了,这时候就将创建的Node加入到队尾

  2. 如果加入队尾不成功或者当前队列为空就调用enq方法来完成加入队列的操作。

  3. enq方法是个自旋操作,加入队列成功才返回。

  4. 首先判断当前队列是否为空,什么时候会为空呢,那就是第一个持有锁的线程,它是直接获取锁,没有经过加入队里的操作,这时候下一个线程没有获取到锁,它首先就会为当前持有锁的线程在队列中创建一个head。

  5. 如果当前队列不为空了,那就会通过cas将当前node加入队列尾部,加不成功就再次循环,直到加入成功。




线程加入阻塞队列成功后,就要调用调用acquireQueued方法去不断自旋获取锁,获取不到就挂起。


AQS的acquireQueued()方法



    final boolean acquireQueued(final Node node, int arg) {
        // true: 当前线程抢占锁失败,需要执行出队操作
        // false: 表示抢占锁成功
        boolean failed = true;
        try {
            // 当前线程是否被中断标记
            boolean interrupted = false;
            //自旋,直到获取锁成功才会跳出循环
            for (; ; ) {
                // 获取当前节点的前一个结点
                final Node p = node.predecessor();
                // 如果是头节点就尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null// help GC
                    failed = false;
                    return interrupted;
                }

                // shouldParkAfterFailedAcquire,判断当前线程是否需要挂起
                //parkAndCheckInterrupt park当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  /**
     * 总结:
     * 1. 当前节点的前置节点是取消状态,第一次来到这个方法时,会越过取消状态的节点,第二次会返回true,然后park当前线程
     * 2. 当前节点的前置节点状态是0,当前线程会设置前置节点的状态为 -1,第二次自旋来到这个方法时,会返回true然后park调当前线程
     */

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // pred:当前线程前置节点
        // node:当前线程节点

        //waitStatus: 0:默认状态 -1:signal状态:表示当前节点释放锁之后会唤醒它的第一个后继结点
        // >0: 当前节点是 cancel状态。就是取消状态
        // 获取前置节点的状态
        int ws = pred.waitStatus;

        // 表示前置节点是可以唤醒当前节点的节点,所以返回true
        if (ws == Node.SIGNAL)

            return true;
        // >0 表示前置节点是 cancel状态
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // CANCELED状态的节点会被出队
            pred.next = node;
        } else {

            // 当前node的前置节点就是0这一种状态

            // 将当前线程node的前置node,状态强制设置为SIGNAL
            // 表示前置节点释放锁之后需要唤醒我
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


这里有一个waitstatus状态,这个状态是用来标识节点的一个状态,不同的值代表不同的含义:

  • 0:默认状态 

  • 1:CANCELED状态,就是被取消状态

  • -1:SIGNAL状态,代表前置节点释放锁后需要唤醒当前节点

  • -2:CONDITION状态。这个状态会在条件锁用到。


  1. 首先就是一个自旋操作,不断尝试获取锁,如果当前节点的前置节点是头结点就尝试获取锁,获取不到锁就会首先调用shouldParkAfterFailedAcquire()方法,

  2. 这个方法主要做了那些事呢,它会判断当前节点的前置节点状态是否大于0,大于0代表这个前置节点是CANCELED状态,然后通过do while循环不断往前遍历将CANCELED状态的结点出队,直到遇到waitStatus状态不是大于0的结点,这个节点就会做为当前节点的前置节点,再下次循环进入该方法就会走到else这个里面

  3. 走到else代表当前节点的前置节点不是大于0,就把当前节点前置节点状态设置为signal状态,signal状态在文章开头描述过,代表前置节点释放锁后需要唤醒当前节点。

  4. 设置为signal状态成功返回true,返回true就会接着走到parkAndCheckInterrupt()方法,这个方法就是通过调用LockSupport的park方法阻塞当前线程,直到被唤醒。




代码很长,调用的方法很多,整个获取锁的流程我们再重新梳理总结一下:

ReentrantLock源码解析(一)  - - 公平锁、非公平锁

  1. 尝试获取锁,如果获取到了就直接返回了;

  2. 尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;

  3. 然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;

  4. 如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);

  5. 调用parkAndCheckInterrupt()阻塞当前线程;

  6. 如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;

  7. 如果不成功,再次阻塞,重复3,4,5直到成功获取到锁。



非公平锁的lock方法



        // UnfairSync.lock()
      final void lock() {
            if (compareAndSetState(01))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

  // AQS.acquire
  public final void acquire(int arg) {
        // tryAcquire:尝试获取锁:成功返回true,失败返回false
        // addWaiter():将未获取到锁的线程放入队列
        // acquireQueued: 挂起当前线程
        //
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 再次设置中断标记位 true
            selfInterrupt();
    }

   //UnfairSync.tryAcquire()
  protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

 // Sync.nonfairTryAcquire()
   final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0// overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }


非公平锁也是通过调用AQS.aquire方法来实现获取锁及阻塞锁的操作,和公平锁的区别在于它首先会先尝试获取锁,如果获取不到就进入acquire方法然后调用tryAcuqire再次尝试获取锁,从tryAcquire方法可以看到,它调用了Sync的nonFairTryAcquire方法尝试获取锁,这个方法和公平锁的tryAcquire方法唯一的区别就是没有先去判断当前线程前面是否有等待的线程,因为少了这个机制,所以它就是非公平的,不是按照线程先后顺序获取锁,接下来其他的操作和公平锁一样,就不再赘述。


释放锁 unlock方法



    public void unlock() {
        sync.release(1);
    }
     // AQS.release
    public final boolean release(int arg) {
        // tryRelease 尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            // 头结点不为空,并且waisStatus不是默认状态
            if (h != null && h.waitStatus != 0)
                // 唤醒下一个节点线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


释放锁是不区分公平不公平的,通过调用Sync.release->AQS.release方法来实现释放锁的逻辑,

在AQS.release的方法中主要就做了两件事,一件事就是调用Sync的tryRelelease方法来尝试释放锁,如果释放成功,就调用AQS.unparkSuccessor()来唤醒下一个等待获取锁的线程。

下面就来详细看看这两个方法的实现逻辑。



Sync.tryRelease()方法



 protected final boolean tryRelease(int releases) {
            // 计算 state值
            int c = getState() - releases;
            // 判断当前线程是不是当前独占锁的线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // ==0 释放锁
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

  1. 首先计算最新的state值

  2. 判断当前线程是否是获取锁的线程,不是就抛出异常

  3. 判断state值是否是0,是0就可以释放锁,将持有锁的线程置为空

  4. 更新state值


AQS.unparkSuccessor方法



 /**
     * 唤醒该节点的下一个节点,如果下一个节点状态为Cancel,则从队尾向前遍历找到不是CANCEL的节点
     */

    private void unparkSuccessor(Node node{
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */

        int ws = node.waitStatus;
        // 小于0说明当前节点是signal状态,可以唤醒它的下一个结点
        if (ws < 0)
            // cas 将 waitstatus更新为0
            compareAndSetWaitStatus(node, ws, 0);

        // 获取当前节点的下一个节点
        Node s = node.next;
        //s 什么时候等于null?
        //1.当前节点就是tail节点时  s == null。
        //2.当新节点入队未完成时(1.设置新节点的prev 指向pred  2.cas设置新节点为tail   3.(未完成)pred.next -> 新节点 )
        //需要找到可以被唤醒的节点..

        // 如果当前节点的下一个节点等于空或者状态为cancel状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            // for循环遍历找到离当前节点最近的一个结点并且不是非取消状态
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒当前节点线程
            LockSupport.unpark(s.thread);
    }


如果释放锁成功就会调用这个方法去唤醒下一个节点线程。

  1. 首先判断当前节点waitstatus状态是否是signal状态,只有是这个状态才有资格释放它的next结点线程

  2. 判断它的next结点是否为空或者是否是cancel状态,如果满足其一则代表这个next节点不符合唤醒条件,就会从后往前循环遍历找到满足唤醒条件的结点


  3. 调用unpark方法唤醒节点。

这里说下,唤醒之后,这个被唤醒的结点会做啥呢,还记得我们讲解的acquireQueued方法吗,它里面有个自旋的逻辑,被park的线程就会在被唤醒后继续执行这个方法尝试获取锁。

最后再通过一个思维导图来梳理一下整个ReentrantLock,帮助大家消化下和回顾这些知识点。

ReentrantLock源码解析(一)  - - 公平锁、非公平锁




原文始发于微信公众号(卧龙小蛋):ReentrantLock源码解析(一) - - 公平锁、非公平锁