ReentrantLock 是 Java JUC(java.util.concurrent) 包下的一个锁工具,它实现了 Lock 接口,与 synchronized 锁不同, ReentrantLock 除了用来做线程间互斥之外,还提供了很多高级的特性,例如公平锁 & 非公平锁 以及可中断 。
本文将从 JDK17 源码角度介绍一下 ReentrantLock 的底层实现原理,这部分是 Java 面试的常考知识点。(目前看到的博客都是基于 JDK8 源码分析的,而鲜有基于 JDK17 源码进行分析的)
2024美团暑期实习后端开发一面:公平锁与非公平锁是如何实现的? 看完这篇文章你就明白了!
AQS AQS 的全称为 AbstractQueueSynchronizer ,即抽象队列同步器,通俗来讲,AQS 的作用就是来定义线程如何获得锁、线程未获得锁如何等待以及线程如何释放锁。 ReentrantLock 的底层实现是高度依赖 AQS 的,这一点从源码中就可以看得出来:
public class ReentrantLock implements Lock , java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L ; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { ··· } ··· }
ReentrantLock 内部有一个 Sync 类型的对象,Sync 这个类则是继承自 _AbstractQueuedSynchronizer_,也就是抽象队列同步器,这个锁的同步控制基础就是这个 Sync 提供的,可以注意到这里的 Sync 仍然是抽象类,其实它还有两个子类,分别是 FairSync 和 NonFairSync ,分别对应于公平锁的实现和非公平锁的实现。
AQS 底层是一个 CLH 队列,是一个双向队列,由 Node 节点连接而成,这部分代码位于 AbstractQueuedSynchronizer 抽象类中,
abstract static class Node { volatile Node prev; volatile Node next; Thread waiter; volatile int status; ··· }
可以看到,一个 Node 包含指向前后 Node 的指针(prev 和 next),一个在等待的线程对象(waiter)以及状态(status)。这里的状态指的是这个线程对象的状态,在 JDK17 中,这个状态只有三种,但在 JDK8 中这里的状态数量更多。
static final int WAITING = 1 ; static final int CANCELLED = 0x80000000 ; static final int COND = 2 ;
其中 WAITING 表示线程在等待,CANCELLED 表示线程已取消获取锁(用于实现锁的可中断),COND 表示线程处于一个条件等待的状态。
除了 CLH 队列外,AQS 中还有一个状态字段,即:
private volatile int state;
用于标识锁目前是否被占用,等于 0 则锁空闲,大于 0 则锁被占用。
一个获取锁失败的线程会被封装成为一个 Node 对象放入到队列中排队等待获取锁。
知道了 AQS 的概念,现在我们可以通过 ReentrantLock 加锁的全流程来窥探到 ReentrantLock 的底层原理了。
ReentrantLock 加锁流程 首先是实例化一个 ReentrantLock 锁对象,这里构造函数可带参数,如果参数为 True ,则会实例化一个公平锁,默认为非公平锁,其实区别就在于是实例化了一个 FairSync 对象还是一个 NonFairSync 对象。
Lock lock = new ReentrantLock ();public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
之后调用 lock()
方法加锁,其实底层是调用了 sync 对象的 lock()
方法来加锁,看到这里,你可能更能理解为什么说 ReentrantLock 的底层实现是高度依赖 AQS 的。
public void lock () { sync.lock(); }
sync.lock()
的实现如下:
final void lock () { if (!initialTryLock()) acquire(1 ); }
这里 initialTryLock()
方法是一个抽象方法,实际上调用的是 Sync 这个类的两个子类 FairSync 和 NonFairSync 的重写的 initialTryLock()
方法。
我们先看非公平锁 FairSync 的实现:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final boolean initialTryLock () { Thread current = Thread.currentThread(); if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(current); return true ; } else if (getExclusiveOwnerThread() == current) { int c = getState() + 1 ; if (c < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(c); return true ; } else return false ; } protected final boolean tryAcquire (int acquires) { if (getState() == 0 && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } }
非公平锁的 initialTryLock()
方法通过 CAS 尝试加锁(即 compareAndSetState(0, 1)
),如果 AQS 的 state 字段为 0 ,则把这个字段变为 1 ,并使用 setExclusiveOwnerThread(current);
将独占锁的线程设置为当前线程即自己,否则加锁失败,失败之后会判断当前独占锁的线程是不是当前线程。如果是的话将 state 加一,同样加锁成功,这里就体现了可重入锁的实现,即同一个线程可以多次获取锁而不阻塞,state 字段其实就是这个线程获取锁的次数。如果这两种情况都没有加锁成功,则认为锁被另一个线程独占,加锁失败,返回 false 。
在第一次 CAS 加锁未成功时,会调用 acquire(1)
这个方法,这个方法是 AbstractQueuedSynchronizer 抽象类提供给我们的,接下来我们看看这个方法的实现。
public final void acquire (int arg) { if (!tryAcquire(arg)) acquire(null , arg, false , false , false , 0L ); }
首先调用 tryAcquire(arg)
方法,这里会去调用具体实现类的 tryAcquire()
方法,同样,我们先看非公平锁的实现,实际上上面已经给出,这里只摘出 tryAcquire()
的实现:
protected final boolean tryAcquire (int acquires) { if (getState() == 0 && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; }
这里是尝试了第二次 CAS 加锁,与 initialTryLock()
方法中的逻辑差不多,同样也是做 CAS 尝试,如果成功,将独占这个锁的线程设置为自己,如果不成功则返回 false 。
如果第二次 CAS 加锁不成功,则调用 acquire(null, arg, false, false, false, 0L)
方法,这个方法极其重要,它的内部定义了线程如何排队来获取锁的逻辑。
下面是这个方法的完整实现,我添加了较为详细的注释便于理解:
final int acquire (Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0 , postSpins = 0 ; boolean interrupted = false , first = false ; Node pred = null ; for (;;) { if (!first && (pred = (node == null ) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0 ) { cleanQueue(); continue ; } else if (pred.prev == null ) { Thread.onSpinWait(); continue ; } } if (first || pred == null ) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0 ); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false ); throw ex; } if (acquired) { if (first) { node.prev = null ; head = node; pred.next = null ; node.waiter = null ; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1 ; } } if (node == null ) { if (shared) node = new SharedNode (); else node = new ExclusiveNode (); } else if (pred == null ) { node.waiter = current; Node t = tail; node.setPrevRelaxed(t); if (t == null ) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null ); else t.next = node; } else if (first && spins != 0 ) { --spins; Thread.onSpinWait(); } else if (node.status == 0 ) { node.status = WAITING; } else { long nanos; spins = postSpins = (byte )((postSpins << 1 ) | 1 ); if (!timed) LockSupport.park(this ); else if ((nanos = time - System.nanoTime()) > 0L ) LockSupport.parkNanos(this , nanos); else break ; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break ; } } return cancelAcquire(node, interrupted, interruptible); }
这段核心代码的作用在于控制未获得锁的线程入队,同时让队列头部的线程参与争夺锁。需要注意的是,这里的 head 指向抢到锁的线程对应的 _node_。
首先第一个 if 中,
如果发现有取消获取锁的线程,会执行队列清理,
如果自己是第二个节点,那么会一直自旋来判断前一个线程是否放弃了获取锁,防止队头线程已经放弃了获取锁,而在队列之后的线程还不知道,造成饥饿。
第二个 if 中,
如果当前节点是队列中的第一个节点或者前驱为空,则参与争夺锁,调用了 tryAcquire()
方法,也就是说争夺锁的线程只有队列中的第一个 Node 中的线程、刚释放了锁的那个线程以及还尚未进入这个函数的线程(在进入这个函数之前会尝试两次 CAS 来获取锁)。
第三个 if 中,
如果当前节点还为空,则实例化一个 _node_,
如果前驱为空,则入队,
如果当前节点为队列中第一个节点,且自旋次数(spins)未用尽,则 -1 ,之后接着做下一次自旋
如果当前节点状态为 0 ,说明是新节点,将状态设置为 WAITTING
最后 else 中会先计算当前线程的自旋次数 _spins_,并将当前线程 park 住
按照源码中 spins 的计算方法,每当 spins 用尽,下一次的自旋次数为上一次的自旋次数的 2 倍 +1 。
可以看到,队列中第一个 node 中的线程等待的时间越长,其可以自旋的次数就会越多,这样可以有效地避免队列中的线程饥饿,因为自旋次数越多,获取锁的概率越大,所以随着等待时间的增加,获取锁的概率其实也是增加的。JDK17 中是这样的,而在 JDK8 中却有所不同,JDK8 中线程每次被唤醒只会自旋一次就迅速被 park 住,这样就比较容易造成饥饿。
概括来讲整个加锁过程,一个线程首先会自旋两次尝试获取锁,如果这两次都没有获取到锁,则进入到 acquire()
方法的大循环中,队列中的所有在等待的线程都会被 park 住, 直到第一个线程被唤醒,然后开始自旋,直到自旋次数用尽,被再次 park 住……
你可能会问,那么谁来唤醒队列中第一个等待的线程呢?当然是刚刚释放锁的那个线程啦~
解锁也是调用了 sync 对象的 release()
方法,下面是解锁部分的源码:
public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { signalNext(head); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (getExclusiveOwnerThread() != Thread.currentThread()) throw new IllegalMonitorStateException (); boolean free = (c == 0 ); if (free) setExclusiveOwnerThread(null ); setState(c); return free; } private static void signalNext (Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0 ) { s.getAndUnsetStatus(WAITING); LockSupport.unpark(s.waiter); } }
tryRelease()
方法尝试解锁,如果成功解锁,则调用 signalNext()
方法,唤醒队列中的第一个线程,于是队列中的第一个线程就加入到了获取锁的行列中来。
这就是 ReentrantLock 加锁过程及其底层原理,你可能会疑惑为什么前面都在讲非公平锁,那公平锁呢?
公平锁 其实公平锁的实现与非公平锁及其类似,只有一处不同,具体可以看公平锁的源码:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final boolean initialTryLock () { Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedThreads() && compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(current); return true ; } } else if (getExclusiveOwnerThread() == current) { if (++c < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(c); return true ; } return false ; } protected final boolean tryAcquire (int acquires) { if (getState() == 0 && !hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } }
可以看到,与非公平锁不同的是,在公平锁在决定是否要去抢锁之前会有一个额外的判断,也就是会调用 hasQueuedPredecessors()
方法,这个方法的作用在于判断队列中是否有等待处理的线程,如果没有,当前线程才会去抢锁,如果队列中有线程则无法抢锁,这样就保证了,线程可以按照自己在队列中的顺序公平地获得锁。