ReentrantLock源码解析(一) – – 公平锁、非公平锁
ReentrantLock叫做可重入锁,什么是可重入呢?
可重入就是说如果当前线程持有锁,当它还没释放锁,它再次进行获取锁是可以继续获取到的,而不用等待释放锁再获取到。可重入锁分为公平锁和非公平锁,默认是非公平锁。下面先来看下它的基础结构图。
ReentrantLock实现了Lock接口,Lock接口提供了应该实现的几个方法
-
lock():获取锁
-
lockInterruptibly(): 获取锁(可中断)
-
tryLock():尝试获取锁,获取不到就返回false
-
tryLock():尝试获取锁,获取不到就等一段时间,时间到了还没获取到就返回false
-
unlock():释放锁
-
newCondition():获取条件锁
先来看下都依赖了哪些内部类
ReentrantLock主要就是通过Sync来完成获取锁和释放锁等操作,通过ReentrantLock构造方法来指定具体要使用公平锁还是非公平锁,通过上图可以看到,Sync是继承了AQS,AQS提供了核心的方法来完成真正意义上释放锁和阻塞锁等逻辑。
AQS在j.u.c包中为很多组件提供了底层的支持,例如CownDownLatch,CyclicBarrier,Semaphore,这些组件其中许多核心方法都依赖于AQS,通过先讲解ReentrantLock,往后再讲解其他组件,就容易理解的多,因为许多方法都是通用的。
通过构造方法可以看到默认是实现的非公平锁,如果要实现公平锁,参数传true即可。为什么默认是实现的非公平锁,而不是公平锁呢?带着疑问往下看,看完源码就知道为啥要这样做了。
这里我们先讲解公平锁的lock方法是如何实现的
public void lock() {
sync.lock();
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS.acquire(int arg)
public final void acquire(int arg) {
// tryAcquire:尝试获取锁:成功返回true,失败返回false
// addWaiter():将未获取到锁的线程放入队列
// acquireQueued: 挂起当前线程
//
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 再次设置中断标记位 true
selfInterrupt();
}
RenntrantLock.的lock方法会调用FairSync的lock方法,而真正实现获取锁的方法是AQS的acquire方法,这个方法主要做了三件事:
-
调用FairSync的tryAcquire方法来尝试获取锁。
-
如果获取不到就调用AQS的addWaiter将线程加入阻塞队列。
-
然后调用acquireQueued方法去不断自旋获取锁,获取不到就挂起,从这可以看到线程的阻塞挂起就是通过这个方法来实现。
下面来一一讲解这几个方法
/**
* 尝试获取锁(包含重入过程)
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// ==0表示当前AQS处于无锁状态
if (c == 0) {
// 公平锁所以要检查当前线程之前是否有等待的线程
// !hasQueuedPredecessors():表示无等待者
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//设置当前线程未独占者线程
setExclusiveOwnerThread(current);
return true;
}
}
// 执行到这表示 c!=0
// 检查当前线程是否是独占锁的线程,因为ReentrantLock是可重入的
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// cas失败或者当前线程前面有等待的线程
return false;
}
-
首先获取当前state状态,这个state状态是啥,对于ReentrantLock来说,如果state==0,就代表当前是无锁状态,不等于就是锁已经被占用
-
如果state==0,此时还没有线程获取锁,就可以尝试获取锁。
-
首先通过hasQueuedPredecessors方法判断当前线程之前是否有等待的锁,没有才能去获取锁资源,有的话就不能获取,这个方法也就是和非公平锁获取锁的主要区别。
-
如果当前线程前面没有等待的锁就通过cas操作更新state值,更新成功就代表获取到锁,将当前线程设置为持有锁的线程。
-
如果当前state不等于0,代表有线程获取锁,那么判断当前线程是否是获取锁的线程,如果是,也可以获取锁,将state值加1,这个逻辑就是实现可重入的逻辑。
调用这个方法的前提就是没有获取到锁,没有获取到锁就要把当前线程加入到阻塞队列。
private Node addWaiter(Node mode) {
// 构建node
Node node = new Node(Thread.currentThread(), mode);
// 获取队尾节点
Node pred = tail;
// 不为空表示队列中已经有Node了
if (pred != null) {
// 当前节点的prev指向pred->tail
node.prev = pred;
// cas成功,Node就入队成功
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//当前队列为空或者cas失败会调用enq方法
// 自旋直到入队成功
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋入队
for (; ; ) {
Node t = tail;
// 当前队列是空队列
if (t == null) {
// 作为当前持锁线程的第一个后继线程,需要做啥呢
//1.因为当前持锁的线程,它获取锁时,直接tryAcquire成功了,没有向 阻塞队列 中添加任何node,所以作为后继需要为它擦屁股..
//2.为自己追加node
//CAS成功,说明当前线程 成为head.next节点。
//线程需要为当前持锁的线程 创建head。
if (compareAndSetHead(new Node()))
tail = head;
}
// 进入到这里表示当前队列不为空,则将当前node入队
// 入不成功会接着循环直到入队成功
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
首先创建一个Node,如果当前队列不为空,表示队列已经有node了,这时候就将创建的Node加入到队尾
-
如果加入队尾不成功或者当前队列为空就调用enq方法来完成加入队列的操作。
-
enq方法是个自旋操作,加入队列成功才返回。
-
首先判断当前队列是否为空,什么时候会为空呢,那就是第一个持有锁的线程,它是直接获取锁,没有经过加入队里的操作,这时候下一个线程没有获取到锁,它首先就会为当前持有锁的线程在队列中创建一个head。
-
如果当前队列不为空了,那就会通过cas将当前node加入队列尾部,加不成功就再次循环,直到加入成功。
线程加入阻塞队列成功后,就要调用调用acquireQueued方法去不断自旋获取锁,获取不到就挂起。
final boolean acquireQueued(final Node node, int arg) {
// true: 当前线程抢占锁失败,需要执行出队操作
// false: 表示抢占锁成功
boolean failed = true;
try {
// 当前线程是否被中断标记
boolean interrupted = false;
//自旋,直到获取锁成功才会跳出循环
for (; ; ) {
// 获取当前节点的前一个结点
final Node p = node.predecessor();
// 如果是头节点就尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire,判断当前线程是否需要挂起
//parkAndCheckInterrupt park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 总结:
* 1. 当前节点的前置节点是取消状态,第一次来到这个方法时,会越过取消状态的节点,第二次会返回true,然后park当前线程
* 2. 当前节点的前置节点状态是0,当前线程会设置前置节点的状态为 -1,第二次自旋来到这个方法时,会返回true然后park调当前线程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred:当前线程前置节点
// node:当前线程节点
//waitStatus: 0:默认状态 -1:signal状态:表示当前节点释放锁之后会唤醒它的第一个后继结点
// >0: 当前节点是 cancel状态。就是取消状态
// 获取前置节点的状态
int ws = pred.waitStatus;
// 表示前置节点是可以唤醒当前节点的节点,所以返回true
if (ws == Node.SIGNAL)
return true;
// >0 表示前置节点是 cancel状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// CANCELED状态的节点会被出队
pred.next = node;
} else {
// 当前node的前置节点就是0这一种状态
// 将当前线程node的前置node,状态强制设置为SIGNAL
// 表示前置节点释放锁之后需要唤醒我
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里有一个waitstatus状态,这个状态是用来标识节点的一个状态,不同的值代表不同的含义:
-
0:默认状态
-
1:CANCELED状态,就是被取消状态
-
-1:SIGNAL状态,代表前置节点释放锁后需要唤醒当前节点
-
-2:CONDITION状态。这个状态会在条件锁用到。
-
首先就是一个自旋操作,不断尝试获取锁,如果当前节点的前置节点是头结点就尝试获取锁,获取不到锁就会首先调用shouldParkAfterFailedAcquire()方法,
-
这个方法主要做了那些事呢,它会判断当前节点的前置节点状态是否大于0,大于0代表这个前置节点是CANCELED状态,然后通过do while循环不断往前遍历将CANCELED状态的结点出队,直到遇到waitStatus状态不是大于0的结点,这个节点就会做为当前节点的前置节点,再下次循环进入该方法就会走到else这个里面
-
走到else代表当前节点的前置节点不是大于0,就把当前节点前置节点状态设置为signal状态,signal状态在文章开头描述过,代表前置节点释放锁后需要唤醒当前节点。
-
设置为signal状态成功返回true,返回true就会接着走到parkAndCheckInterrupt()方法,这个方法就是通过调用LockSupport的park方法阻塞当前线程,直到被唤醒。
代码很长,调用的方法很多,整个获取锁的流程我们再重新梳理总结一下:
-
尝试获取锁,如果获取到了就直接返回了;
-
尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;
-
然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;
-
如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);
-
调用parkAndCheckInterrupt()阻塞当前线程;
-
如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;
-
如果不成功,再次阻塞,重复3,4,5直到成功获取到锁。
// UnfairSync.lock()
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AQS.acquire
public final void acquire(int arg) {
// tryAcquire:尝试获取锁:成功返回true,失败返回false
// addWaiter():将未获取到锁的线程放入队列
// acquireQueued: 挂起当前线程
//
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 再次设置中断标记位 true
selfInterrupt();
}
//UnfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁也是通过调用AQS.aquire方法来实现获取锁及阻塞锁的操作,和公平锁的区别在于它首先会先尝试获取锁,如果获取不到就进入acquire方法然后调用tryAcuqire再次尝试获取锁,从tryAcquire方法可以看到,它调用了Sync的nonFairTryAcquire方法尝试获取锁,这个方法和公平锁的tryAcquire方法唯一的区别就是没有先去判断当前线程前面是否有等待的线程,因为少了这个机制,所以它就是非公平的,不是按照线程先后顺序获取锁,接下来其他的操作和公平锁一样,就不再赘述。
public void unlock() {
sync.release(1);
}
// AQS.release
public final boolean release(int arg) {
// tryRelease 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 头结点不为空,并且waisStatus不是默认状态
if (h != null && h.waitStatus != 0)
// 唤醒下一个节点线程
unparkSuccessor(h);
return true;
}
return false;
}
释放锁是不区分公平不公平的,通过调用Sync.release->AQS.release方法来实现释放锁的逻辑,
在AQS.release的方法中主要就做了两件事,一件事就是调用Sync的tryRelelease方法来尝试释放锁,如果释放成功,就调用AQS.unparkSuccessor()来唤醒下一个等待获取锁的线程。
下面就来详细看看这两个方法的实现逻辑。
protected final boolean tryRelease(int releases) {
// 计算 state值
int c = getState() - releases;
// 判断当前线程是不是当前独占锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// ==0 释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
-
首先计算最新的state值
-
判断当前线程是否是获取锁的线程,不是就抛出异常
-
判断state值是否是0,是0就可以释放锁,将持有锁的线程置为空
-
更新state值
/**
* 唤醒该节点的下一个节点,如果下一个节点状态为Cancel,则从队尾向前遍历找到不是CANCEL的节点
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 小于0说明当前节点是signal状态,可以唤醒它的下一个结点
if (ws < 0)
// cas 将 waitstatus更新为0
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
//s 什么时候等于null?
//1.当前节点就是tail节点时 s == null。
//2.当新节点入队未完成时(1.设置新节点的prev 指向pred 2.cas设置新节点为tail 3.(未完成)pred.next -> 新节点 )
//需要找到可以被唤醒的节点..
// 如果当前节点的下一个节点等于空或者状态为cancel状态
if (s == null || s.waitStatus > 0) {
s = null;
// for循环遍历找到离当前节点最近的一个结点并且不是非取消状态
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒当前节点线程
LockSupport.unpark(s.thread);
}
如果释放锁成功就会调用这个方法去唤醒下一个节点线程。
-
首先判断当前节点waitstatus状态是否是signal状态,只有是这个状态才有资格释放它的next结点线程
-
判断它的next结点是否为空或者是否是cancel状态,如果满足其一则代表这个next节点不符合唤醒条件,就会从后往前循环遍历找到满足唤醒条件的结点
-
调用unpark方法唤醒节点。
这里说下,唤醒之后,这个被唤醒的结点会做啥呢,还记得我们讲解的acquireQueued方法吗,它里面有个自旋的逻辑,被park的线程就会在被唤醒后继续执行这个方法尝试获取锁。
最后再通过一个思维导图来梳理一下整个ReentrantLock,帮助大家消化下和回顾这些知识点。
原文始发于微信公众号(卧龙小蛋):ReentrantLock源码解析(一) - - 公平锁、非公平锁