Java ReentrantLock原理

ReentrantLock Java JUC(java.util.concurrent) 包下的一个锁工具,它实现了 Lock 接口,与 synchronized 锁不同, ReentrantLock 除了用来做线程间互斥之外,还提供了很多高级的特性,例如公平锁 & 非公平锁以及可中断。  

本文将从 JDK17 源码角度介绍一下 ReentrantLock 的底层实现原理,这部分是 Java 面试的常考知识点。(目前看到的博客都是基于 JDK8 源码分析的,而鲜有基于 JDK17 源码进行分析的)  

2024美团暑期实习后端开发一面:公平锁与非公平锁是如何实现的?看完这篇文章你就明白了!

AQS

AQS 的全称为 AbstractQueueSynchronizer ,即抽象队列同步器,通俗来讲,AQS 的作用就是来定义线程如何获得锁、线程未获得锁如何等待以及线程如何释放锁。 ReentrantLock 的底层实现是高度依赖 AQS 的,这一点从源码中就可以看得出来:

public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699L;

    /** Synchronizer providing all implementation mechanics */

    private final Sync sync;

    /**

     * Base of synchronization control for this lock. Subclassed

     * into fair and nonfair versions below. Uses AQS state to

     * represent the number of holds on the lock.

     */

    abstract static class Sync extends AbstractQueuedSynchronizer {

        ···

    }

    ···

}

ReentrantLock 内部有一个 Sync 类型的对象,Sync 这个类则是继承自 AbstractQueuedSynchronizer,也就是抽象队列同步器,这个锁的同步控制基础就是这个 Sync 提供的,可以注意到这里的 Sync 仍然是抽象类,其实它还有两个子类,分别是 FairSyncNonFairSync ,分别对应于公平锁的实现和非公平锁的实现。  

AQS 底层是一个 CLH 队列,是一个双向队列,由 Node 节点连接而成,这部分代码位于 AbstractQueuedSynchronizer 抽象类中,

/** CLH Nodes */

abstract static class Node {

    volatile Node prev;       // initially attached via casTail

    volatile Node next;       // visibly nonnull when signallable

    Thread waiter;            // visibly nonnull when enqueued

    volatile int status;      // written by owner, atomic bit ops by others

    ···

}

可以看到,一个 Node 包含指向前后 Node 的指针(prev 和 next),一个在等待的线程对象(waiter)以及状态(status)。这里的状态指的是这个线程对象的状态,在 JDK17 中,这个状态只有三种,但在 JDK8 中这里的状态数量更多。  

// Node status bits, also used as argument and return values

static final int WAITING   = 1;          // must be 1

static final int CANCELLED = 0x80000000; // must be negative

static final int COND      = 2;          // in a condition wait

其中 WAITING 表示线程在等待,CANCELLED 表示线程已取消获取锁(用于实现锁的可中断),COND 表示线程处于一个条件等待的状态。  

除了 CLH 队列外,AQS 中还有一个状态字段,即:

/**

 * The synchronization state.

*/

private volatile int state;

用于标识锁目前是否被占用,等于 0 则锁空闲,大于 0 则锁被占用。  

一个获取锁失败的线程会被封装成为一个 Node 对象放入到队列中排队等待获取锁。  

知道了 AQS 的概念,现在我们可以通过 ReentrantLock 加锁的全流程来窥探到 ReentrantLock 的底层原理了。  

ReentrantLock 加锁流程

首先是实例化一个 ReentrantLock 锁对象,这里构造函数可带参数,如果参数为 True ,则会实例化一个公平锁,默认为非公平锁,其实区别就在于是实例化了一个 FairSync 对象还是一个 NonFairSync 对象。

// 实例化一个锁

Lock lock = new ReentrantLock();

// 构造函数的实现

// 非公平锁

public ReentrantLock() {

    sync = new NonfairSync();

}

// 公平锁

public ReentrantLock(boolean fair) {

    sync = fair ? new FairSync() : new NonfairSync();

}

之后调用 lock() 方法加锁,其实底层是调用了 sync 对象的 lock() 方法来加锁,看到这里,你可能更能理解为什么说 ReentrantLock 的底层实现是高度依赖 AQS 的。

public void lock() {

    sync.lock();

}

sync.lock() 的实现如下:

final void lock() {

    if (!initialTryLock())

        acquire(1);

}

这里 initialTryLock() 方法是一个抽象方法,实际上调用的是 Sync 这个类的两个子类 FairSyncNonFairSync 的重写的 initialTryLock() 方法。  

我们先看非公平锁 FairSync 的实现:

/**

* Sync object for non-fair locks

*/

static final class NonfairSync extends Sync {

    private static final long serialVersionUID = 7316153563782823691L;

    final boolean initialTryLock() {

        Thread current = Thread.currentThread();

        if (compareAndSetState(0, 1)) { // first attempt is unguarded

            setExclusiveOwnerThread(current);

            return true;

        } else if (getExclusiveOwnerThread() == current) {

            int c = getState() + 1;

            if (c < 0) // overflow

                throw new Error("Maximum lock count exceeded");

            setState(c);

            return true;

        } else

            return false;

    }

    /**

     * Acquire for non-reentrant cases after initialTryLock prescreen

     */

    protected final boolean tryAcquire(int acquires) {

        if (getState() == 0 && compareAndSetState(0, acquires)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }

}

非公平锁的 initialTryLock() 方法通过 CAS 尝试加锁(即 compareAndSetState(0, 1)),如果 AQSstate 字段为 0 ,则把这个字段变为 1 ,并使用 setExclusiveOwnerThread(current); 将独占锁的线程设置为当前线程即自己,否则加锁失败,失败之后会判断当前独占锁的线程是不是当前线程。如果是的话将 state 加一,同样加锁成功,这里就体现了可重入锁的实现,即同一个线程可以多次获取锁而不阻塞,state 字段其实就是这个线程获取锁的次数。如果这两种情况都没有加锁成功,则认为锁被另一个线程独占,加锁失败,返回 false 。  

在第一次 CAS 加锁未成功时,会调用 acquire(1) 这个方法,这个方法是 AbstractQueuedSynchronizer 抽象类提供给我们的,接下来我们看看这个方法的实现。  

public final void acquire(int arg) {

    if (!tryAcquire(arg))

        acquire(null, arg, false, false, false, 0L);

}

首先调用 tryAcquire(arg) 方法,这里会去调用具体实现类的 tryAcquire() 方法,同样,我们先看非公平锁的实现,实际上上面已经给出,这里只摘出 tryAcquire() 的实现:

protected final boolean tryAcquire(int acquires) {

    if (getState() == 0 && compareAndSetState(0, acquires)) {

        setExclusiveOwnerThread(Thread.currentThread());

        return true;

    }

    return false;

}

这里是尝试了第二次 CAS 加锁,与 initialTryLock() 方法中的逻辑差不多,同样也是做 CAS 尝试,如果成功,将独占这个锁的线程设置为自己,如果不成功则返回 false 。  

如果第二次 CAS 加锁不成功,则调用 acquire(null, arg, false, false, false, 0L) 方法,这个方法极其重要,它的内部定义了线程如何排队来获取锁的逻辑。  

下面是这个方法的完整实现,我添加了较为详细的注释便于理解:

final int acquire(Node node, int arg, boolean shared,

                  boolean interruptible, boolean timed, long time) {

    // node 是将要加入 DLH 队列的节点

    // spins 是队头节点在竞争锁时可以自旋的次数

    // interrupted 当前线程是否中断获取锁

    // first 当前线程是否位于队列第一个节点中

    // pred 当前节点的前一个节点

    Thread current = Thread.currentThread();

    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread

    boolean interrupted = false, first = false;

    Node pred = null;                // predecessor of node when enqueued

    /*

     * Repeatedly:

     *  Check if node now first

     *    if so, ensure head stable, else ensure valid predecessor

     *  if node is first or not yet enqueued, try acquiring

     *  else if node not yet created, create it

     *  else if not yet enqueued, try once to enqueue

     *  else if woken from park, retry (up to postSpins times)

     *  else if WAITING status not set, set and retry

     *  else park and clear WAITING status, and check cancellation

     */

    // 一个大循环,来控制线程对锁的竞争

    for (;;) {

        // 如果当前节点不为第一个节点、前一个节点也不为空、头结点也不等于前一个节点

        if (!first && (pred = (node == null) ? null : node.prev) != null &&

            !(first = (head == pred))) {

            // 如果前一个节点状态小于 0,说明前驱是一个取消了获取锁请求的线程

            // 则清理队列,去除掉取消获取锁的节点

            if (pred.status < 0) {

                cleanQueue();           // predecessor cancelled

                continue;

            // 如果前驱的前驱为空,

            } else if (pred.prev == null) {

                // 这是一个自旋等待提示,告诉CPU当前线程正在自旋等待,

                // 处理器可以进行一些针对自旋的优化

                Thread.onSpinWait();    // ensure serialization

                continue;

            }

        }

        // 如果当前节点是第一个节点或者前驱为空,则竞争锁

        if (first || pred == null) {

            boolean acquired;

            try {

                if (shared)

                    acquired = (tryAcquireShared(arg) >= 0);

                else

                    // CAS 获取锁

                    acquired = tryAcquire(arg);

            } catch (Throwable ex) {

                cancelAcquire(node, interrupted, false);

                throw ex;

            }

            // 如果成功获取到了锁

            if (acquired) {

                // 如果还是第一个,把这个节点从队列中拿下来,赋值给 head

                if (first) {

                    node.prev = null;

                    head = node;

                    pred.next = null;

                    node.waiter = null;

                    if (shared)

                        signalNextIfShared(node);

                    if (interrupted)

                        current.interrupt();

                }

                return 1;

            }

        }

        // 如果当前节点为空

        if (node == null) {                 // allocate; retry before enqueue

            if (shared)

                node = new SharedNode();

            else

                // 创建节点

                node = new ExclusiveNode();

        // 如果前驱为空,入队

        } else if (pred == null) {          // try to enqueue

            node.waiter = current;

            Node t = tail;

            node.setPrevRelaxed(t);         // avoid unnecessary fence

            if (t == null)

                tryInitializeHead();

            else if (!casTail(t, node))

                node.setPrevRelaxed(null);  // back out

            else

                t.next = node;

        // 如果是第一个节点且自旋次数不为 0 自旋次数减,且调用 onSpinWait()

        } else if (first && spins != 0) {

            --spins;                        // reduce unfairness on rewaits

            Thread.onSpinWait();

        // 如果当前节点状态为 0 ,说明是新节点,初始化状态为 WAITTING

        } else if (node.status == 0) {

            node.status = WAITING;          // enable signal and recheck

        // 重新计算自旋次数,并且由于当前线程自旋次数用完,被 park 住

        } else {

            long nanos;

            spins = postSpins = (byte)((postSpins << 1) | 1);

            if (!timed)

                LockSupport.park(this);

            else if ((nanos = time - System.nanoTime()) > 0L)

                LockSupport.parkNanos(this, nanos);

            else

                break;

            node.clearStatus();

            if ((interrupted |= Thread.interrupted()) && interruptible)

                break;

        }

    }

    return cancelAcquire(node, interrupted, interruptible);

}

这段核心代码的作用在于控制未获得锁的线程入队,同时让队列头部的线程参与争夺锁。需要注意的是,这里的 head 指向抢到锁的线程对应的 node。  

首先第一个 if 中,

  1. 如果发现有取消获取锁的线程,会执行队列清理,
  2. 如果自己是第二个节点,那么会一直自旋来判断前一个线程是否放弃了获取锁,防止队头线程已经放弃了获取锁,而在队列之后的线程还不知道,造成饥饿。  

第二个 if 中,

  1. 如果当前节点是队列中的第一个节点或者前驱为空,则参与争夺锁,调用了 tryAcquire() 方法,也就是说争夺锁的线程只有队列中的第一个 Node 中的线程、刚释放了锁的那个线程以及还尚未进入这个函数的线程(在进入这个函数之前会尝试两次 CAS 来获取锁)。  

第三个 if 中,

1. 如果当前节点还为空,则实例化一个 node

2. 如果前驱为空,则入队,

3. 如果当前节点为队列中第一个节点,且自旋次数(spins)未用尽,则 -1 ,之后接着做下一次自旋

4. 如果当前节点状态为 0 ,说明是新节点,将状态设置为 WAITTING

5. 最后 else 中会先计算当前线程的自旋次数 spins,并将当前线程 park 住  

按照源码中 spins 的计算方法,每当 spins 用尽,下一次的自旋次数为上一次的自旋次数的 2 倍 +1 。  

可以看到,队列中第一个 node 中的线程等待的时间越长,其可以自旋的次数就会越多,这样可以有效地避免队列中的线程饥饿,因为自旋次数越多,获取锁的概率越大,所以随着等待时间的增加,获取锁的概率其实也是增加的。JDK17中是这样的,而在 JDK8 中却有所不同,JDK8 中线程每次被唤醒只会自旋一次就迅速被 park 住,这样就比较容易造成饥饿。  

概括来讲整个加锁过程,一个线程首先会自旋两次尝试获取锁,如果这两次都没有获取到锁,则进入到 acquire() 方法的大循环中,队列中的所有在等待的线程都会被 park 住, 直到第一个线程被唤醒,然后开始自旋,直到自旋次数用尽,被再次 park 住……  

你可能会问,那么谁来唤醒队列中第一个等待的线程呢?当然是刚刚释放锁的那个线程啦~  

解锁也是调用了 sync 对象的 release() 方法,下面是解锁部分的源码:  

public void unlock() {

    sync.release(1);

}

public final boolean release(int arg) {

    if (tryRelease(arg)) {

        signalNext(head);

        return true;

    }

    return false;

}

protected final boolean tryRelease(int releases) {

    int c = getState() - releases;

    if (getExclusiveOwnerThread() != Thread.currentThread())

        throw new IllegalMonitorStateException();

    boolean free = (c == 0);

    if (free)

        setExclusiveOwnerThread(null);

    setState(c);

    return free;

}

private static void signalNext(Node h) {

    Node s;

    if (h != null && (s = h.next) != null && s.status != 0) {

        s.getAndUnsetStatus(WAITING);

        LockSupport.unpark(s.waiter);

    }

}

tryRelease() 方法尝试解锁,如果成功解锁,则调用 signalNext() 方法,唤醒队列中的第一个线程,于是队列中的第一个线程就加入到了获取锁的行列中来。  

这就是 ReentrantLock 加锁过程及其底层原理,你可能会疑惑为什么前面都在讲非公平锁,那公平锁呢?  

公平锁  

其实公平锁的实现与非公平锁及其类似,只有一处不同,具体可以看公平锁的源码:

/**

* Sync object for fair locks

*/

static final class FairSync extends Sync {

    private static final long serialVersionUID = -3000897897090466540L;

    /**

     * Acquires only if reentrant or queue is empty.

     */

    final boolean initialTryLock() {

        Thread current = Thread.currentThread();

        int c = getState();

        if (c == 0) {

            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {

                setExclusiveOwnerThread(current);

                return true;

            }

        } else if (getExclusiveOwnerThread() == current) {

            if (++c < 0) // overflow

                throw new Error("Maximum lock count exceeded");

            setState(c);

            return true;

        }

        return false;

    }

    /**

     * Acquires only if thread is first waiter or empty

     */

    protected final boolean tryAcquire(int acquires) {

        if (getState() == 0 && !hasQueuedPredecessors() &&

            compareAndSetState(0, acquires)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }

}

可以看到,与非公平锁不同的是,在公平锁在决定是否要去抢锁之前会有一个额外的判断,也就是会调用 hasQueuedPredecessors() 方法,这个方法的作用在于判断队列中是否有等待处理的线程,如果没有,当前线程才会去抢锁,如果队列中有线程则无法抢锁,这样就保证了,线程可以按照自己在队列中的顺序公平地获得锁。  

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇