快捷搜索:  汽车  科技

jdk18使用教程(JDK源码详解ReentrantLock)

jdk18使用教程(JDK源码详解ReentrantLock)private final Sync sync; // 构造一个 ReentrantLock 实例(非公平锁) public ReentrantLock() { sync = new NonfairSync(); } // 构造一个 ReentrantLock 实例(指定是否公平) public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }可以看到,两个构造器都是初始化一个 Sync 类型的成员变量。而且,当 boolean 值 fair 为 true 时,初始化的 sync 为 FairSync,为 false 时初始化为 NonFairSync,二者分别表示「公平锁」和「非公平锁」。可以看到无参构造默认是非公平锁。ReentrantLock 有两

ReentrantLock

在 JDK 1.5 以前,锁的实现只能用 synchronized 关键字;1.5 开始提供了 ReentrantLock,它是 API 层面的锁。先看下 ReentrantLock 的类签名以及如何使用:

public class ReentrantLock implements Lock java.io.Serializable {}

典型用法:

public void m() { Lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } }

该用法和使用 synchronized 关键字效果是一样的。既然有了 synchronized,为什么又会有 Lock 呢?相比于 synchronized,其实 ReentrantLock 的出现并不重复,它增加了不少功能,下面先简单介绍几个概念。

公平锁&非公平锁:所谓锁是否公平,简单理解就是一系列线程获取到锁的顺序是否遵循「先来后到」。即,如果先申请锁的线程先获取到锁,就是公平锁;否则就是非公平锁。ReentrantLock 的默认实现和 synchronized 都是非公平锁。

可重入锁:锁是否可重入,就是一个线程是否可以多次获取同一个锁,若是,就是可重入锁。ReentrantLock 和 synchronized 都是可重入锁。

代码分析

构造器

ReentrantLock 有两个构造器,分别如下:

private final Sync sync; // 构造一个 ReentrantLock 实例(非公平锁) public ReentrantLock() { sync = new NonfairSync(); } // 构造一个 ReentrantLock 实例(指定是否公平) public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

可以看到,两个构造器都是初始化一个 Sync 类型的成员变量。而且,当 boolean 值 fair 为 true 时,初始化的 sync 为 FairSync,为 false 时初始化为 NonFairSync,二者分别表示「公平锁」和「非公平锁」。可以看到无参构造默认是非公平锁。

常用方法

ReentrantLock 常用的方法就是 Lock 接口定义的几个方法,如下:

// 获取锁(阻塞式) public void lock() { sync.lock(); } // 获取锁(响应中断) public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // 尝试获取锁 public boolean tryLock() { return sync.nonfairTryAcquire(1); } // 尝试获取锁(有超时等待) public boolean tryLock(long timeout TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 unit.toNanos(timeout)); } // 释放锁 public void unlock() { sync.release(1); }

可以看到,这几个方法内部都是通过调用 Sync 类(或其子类)的方法来实现,因此先从 Sync 类入手分析,代码如下(部分省略):

// 抽象类,继承了 AQS abstract static class Sync extends AbstractQueuedSynchronizer { // 获取锁的方法,由子类实现 abstract void lock(); // 非公平锁的 tryLock 方法实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取 AQS 的 state 变量 int c = getState(); // 若为 0,表示当前没有被其他线程占用 if (c == 0) { // CAS 修改 state,若修改成功,表示成功获取资源 if (compareAndSetState(0 acquires)) { // 将当前线程设置为 owner,到这里表示当前线程成功获取资源 setExclusiveOwnerThread(current); return true; } } // state 不为 0,且 owner 为当前线程 // 表示当前线程已经获取到了资源,这里表示“重入” else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 修改 state 值(因为当前线程已经获取资源,不存在竞争,因此无需 CAS 操作) setState(nextc); return true; } return false; } // 释放锁操作(对 state 做减法) protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 成功释放后将 owner 设为空 setExclusiveOwnerThread(null); } // 修改 state 的值 // PS: 因为可能存在“重入”,因此一次释放操作后当前线程仍有可能占用资源, // 所以不会直接把 state 设为 0 setState(c); return free; } // 其他方法... final boolean isLocked() { return getState() != 0; } }

Sync 类继承自 AQS,其中 nonfairTryAcquire 方法是非公平锁 tryAcquire 方法的实现。

从上面代码可以看出,锁的获取和释放是通过修改 AQS 的 state 变量来实现的。lock 方法可以看做对 state 执行“加法”操作,而 unlock 可以看做对 state 执行“减法”操作,当 state 为 0 时,表示当前没有线程占用资源。

公平锁&非公平锁

(1)非公平锁 NonFairSync:

static final class NonfairSync extends Sync { final void lock() { // CAS 尝试将 state 值修改为 1 if (compareAndSetState(0 1)) // 若修改成功,则将当前线程设为 owner,表示成功获取锁 setExclusiveOwnerThread(Thread.currentThread()); // 若获取失败,则执行 AQS 的 acquire 方法(独占模式获取资源) else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

可以看到,非公平锁的 lock 操作为:先尝试以 CAS 方式修改 state 的值,若修改成功,则表示成功获取到锁,将 owner 设为当前线程;否则就执行 AQS 中的 acquire 方法,具体可参考前文「万字长文详解!JDK源码-AbstractQueuedSynchronizer(2)」,这里不再赘述。

(2)公平锁 FairSync:

static final class FairSync extends Sync { final void lock() { acquire(1); } // 公平锁的 tryAcquire 实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state 为 0,表示资源未被占用 if (c == 0) { // 若队列中有其他线程在排队等待,则返回 false,表示获取失败; // 否则,再尝试去修改 state 的值 // PS: 这里是公平锁与非公平锁的区别所在 if (!hasQueuedPredecessors() && compareAndSetState(0 acquires)) { setExclusiveOwnerThread(current); return true; } } // 若当前线程已占用了锁,则“重入” else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

可以看到,与非公平锁相比,公平锁的不同之处在于增加了判断条件 hasQueuedPredecessors,即首先判断主队列中是否有其他线程在等待,当没有其他线程在排队时再去获取,否则获取失败。

hasQueuedPredecessors 在 AQS 中实现如下:

/** * Queries whether any Threads have been waiting to acquire longer * than the current thread. */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

小结

synchronized 与 ReentrantLock 比较:

相同点:二者都是互斥锁,可重入,默认都是非公平锁。

不同点:synchronized 是语法层面实现,自动获取锁和释放锁;ReentrantLock 是 API 层面实现,手动获取锁和释放锁。

ReentrantLock 相比 synchronized 的优势:

1. 可响应中断;

2. 获取锁可设置超时;

3. 可实现公平锁;

4. 可绑定多个条件(Condition)。

JDK 1.6 以后,synchronized 与 ReentrantLock 性能基本持平,JVM 未来的性能优化也会更偏向于原生的 synchronized。因此,如何选择还要根据实际需求,性能不再是不选择 synchronized 的原因了。

CountDownLatch

CountDownLatch 是并发包中的一个工具类,它的典型应用场景为:一个线程等待几个线程执行,待这几个线程结束后,该线程再继续执行。

简单起见,可以把它理解为一个倒数的计数器:初始值为线程数,每个线程结束时执行减 1 操作,当计数器减到 0 时等待的线程再继续执行。

代码分析

CountDownLatch 的类签名和主要方法如下:

public class CountDownLatch {}

jdk18使用教程(JDK源码详解ReentrantLock)(1)

常用方法为:await()、await(long TimeUnit) 和 countDown。其中两个 await 都是让当前线程进入等待状态(获取资源失败);而 countDown 方法是将计数器减去 1,当计数器为 0 的时候,那些处于等待状态的线程会继续执行(获取资源成功)。

构造器代码如下:

private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

构造器(该构造器是唯一的)传入一个正整数,且初始化了 sync 变量,Sync 是内部的一个嵌套类,继承自 AQS。

await / await(long TimeUnit):

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 unit.toNanos(timeout)); }

countDown:

public void countDown() { sync.releaseShared(1); }

其中,acquireSharedInterruptibly、tryAcquireSharedNanos 和 releaseShared 都是 AQS 中「共享模式」的方法,具体代码可参考前文「JDK源码分析-AbstractQueuedSynchronizer(3)」的分析。

嵌套类 Sync 代码如下:

private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 构造器,初始化 AQS 的 state 变量 Sync(int count) { setState(count); } int getCount() { return getState(); } // 尝试获取资源的操作 // 只有当 state 变量为 0 的时候才能获取成功(返回 1) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 尝试释放资源的操作 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 该操作就是尝试把 state 变量减去 1 int nextc = c-1; if (compareAndSetState(c nextc)) return nextc == 0; } } }

Sync 继承了 AQS 抽象类,根据 AQS 可知,acquireSharedInterruptibly 和 tryAcquireSharedNanos 方法的实现都调用了 tryAcquireShared。

流程说明:通常先把 CountDownLatch 的计数器(state)初始化为 N,执行 wait 操作就是尝试以共享模式获取资源,而每次 countDown 操作就是将 N 减去 1,只有当 N 减到 0 的时候,才能获取成功(tryAcquireShared 方法),然后继续执行。

场景举例

为便于理解该类的用法,举两个简单的例子来说明它的使用场景。

场景 1:一个线程等待多个线程执行完之后再继续执行

public void test() throws InterruptedException { int count = 5; // CountDownLatch 的初始化计数器为 5 // 注意线程数和计数器保持一致 CountDownLatch countDownLatch = new CountDownLatch(count); for (int i = 0; i < count; i ) { int finalI = i; new Thread(() -> { try { TimeUnit.SECONDS.sleep(finalI); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() " is working .."); // 每个线程执行结束时执行 countDown countDownLatch.countDown(); }).start(); } // 主线程进入等待状态(尝试获取资源,成功后才能继续执行) countDownLatch.await(); System.out.println(Thread.currentThread().getName() " go on .."); } /* 输出结果: Thread-0 is working .. Thread-1 is working .. Thread-2 is working .. Thread-3 is working .. Thread-4 is working .. main go on .. */

场景 2:一个线程到达指定条件后,通知另一个线程

private static volatile List<Integer> list = new ArrayList<>(); private static void test() { CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() -> { if (list.size() != 5) { try { // list 的大小为 5 时再继续执行,否则等待 // 等待 state 减到 0 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() " start.."); }).start(); new Thread(() -> { for (int i = 0; i < 10; i ) { list.add(i); System.out.println(Thread.currentThread().getName() " add " i); if (list.size() == 5) { // 满足条件时将 state 减 1 countDownLatch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } /* 输出结果: Thread-1 add 0 Thread-1 add 1 Thread-1 add 2 Thread-1 add 3 Thread-1 add 4 Thread-0 start.. Thread-1 add 5 Thread-1 add 6 Thread-1 add 7 Thread-1 add 8 Thread-1 add 9 */

小结

CountDownLatch 可以理解为一个倒数的计数器,它的典型应用场景就是一个线程等待几个线程执行结束后再继续执行。其内部是基于 AQS 的共享模式实现的。

CyclicBarrier

CyclicBarrier 是并发包中的一个工具类,它的典型应用场景为:几个线程执行完任务后,执行另一个线程(回调函数,可选),然后继续下一轮,如此往复。

打个通俗的比方,可以把 CyclicBarrier 的执行流程比作:几个人(类比线程)围着操场跑圈,所有人都到达终点后(终点可理解为“屏障(barrier)”,到达次序可能有先后,对应线程执行任务有快慢),执行某个操作(回调函数),然后再继续跑下一圈(下一次循环),如此往复。

该类与 CountDownLatch 相比,可以把后者理解为“一次性(one-shot)”操作,而前者是“可循环”的操作,下面分析其代码实现。

代码分析

CyclicBarrier 的主要方法如下:

jdk18使用教程(JDK源码详解ReentrantLock)(2)

其中常用的是两个 await 方法,作用是让当前线程进入等待状态。

成员变量及嵌套类:

// 内部嵌套类 private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count;

内部嵌套类 Generation 表示代数,每次屏障(barrier)破坏之前属于同一代,之后进入下一代。

构造器:

// 无回调函数 public CyclicBarrier(int parties) { this(parties null); } // 有回调函数 public CyclicBarrier(int parties Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

CyclicBarrier 有两个构造器,其中后者可以传入一个回调函数(barrierAction),parties 表示调用 await 的线程数。

await 方法:

// 阻塞式等待 public int await() throws InterruptedException BrokenBarrierException { try { return dowait(false 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // 有超时的等待 public int await(long timeout TimeUnit unit) throws InterruptedException BrokenBarrierException TimeoutException { return dowait(true unit.toNanos(timeout)); }

可以看到两个 await 方法都是调用 dowait 方法来实现的(该方法也是 CyclicBarrier 的核心方法),如下:

private int dowait(boolean timed long nanos) throws InterruptedException BrokenBarrierException TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { // 获取当前代 final Generation g = generation; // 若屏障破坏,则抛出异常 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // count 减 1 int index = --count; if (index == 0) { // tripped // count 减到 0 时触发的操作 boolean ranAction = false; try { // 传入的回调函数 final Runnable command = barrierCommand; if (command != null) // 若传了回调函数,则执行回调函数 // PS: 由此可知,回调函数由最后一个执行结束的线程执行 command.run(); ranAction = true; // 进入下一代(下一轮操作) nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped broken interrupted or timed out for (;;) { try { // count 不为 0 时,当前线程进入等待状态 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

nextGeneration 和 breakBarrier:

// 进入下一轮 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } // 破坏屏障 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }

执行流程:初始化时 parties 和 count 的值相同(由构造器 parties 参数传入),之后每有一个线程调用 await 方法 count 值就减 1,直至 count 为 0 时(若不为 0 则等待),执行传入的回调函数 barrierCommand(若不为空),然后唤醒所有线程,并将 count 重置为 parties,开始下一轮操作。

场景举例

为了便于理解 CyclicBarrier 的用法,下面简单举例演示(仅供参考):

public class CyclicBarrierTest { private static final int COUNT = 3; public static void main(String[] args) throws InterruptedException { // 初始化 CyclicBarrier 对象及回调函数 CyclicBarrier cyclicBarrier = new CyclicBarrier(COUNT () -> { // 模拟回调函数的操作(模拟写操作) System.out.println(Thread.currentThread().getName() " start writing.."); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---------"); }); while (true) { // 创建几个线程执行任务 for (int i = 0; i < COUNT; i ) { new Thread(() -> { // 模拟读操作 System.out.println(Thread.currentThread().getName() " is reading.."); try { TimeUnit.SECONDS.sleep(3); // 等待 cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } // 睡眠 10 秒,然后进入下一轮 TimeUnit.SECONDS.sleep(10); } } } /* 执行结果(仅供参考): Thread-0 is reading.. Thread-1 is reading.. Thread-2 is reading.. Thread-1 start writing.. --------- Thread-3 is reading.. Thread-4 is reading.. Thread-5 is reading.. Thread-5 start writing.. --------- */

PS: 此处模拟多个线程执行读操作,都读完后再执行写操作;之后再读、再写……可以理解为简单的对账系统。

此处代码仅供参考,只为便于理解该类的用法。实际上每次创建线程是不合理的(可以使用线程池,由于未分析,这里暂不使用)。

小结

CyclicBarrier 也可以理解为倒数的计数器,它与 CountDownLatch 有些类似。后者是“一次性”的,而前者是“可循环使用”的

猜您喜欢: