快捷搜索:  汽车  科技

jdk全栈开发教程(JDK源码详解DelayQueue)

jdk全栈开发教程(JDK源码详解DelayQueue)public interface Comparable<T> { public int compareTo(T o); }这里不再详述。Comparable 接口也只有一个 compareTo 方法:相关接口DelayQueue 中的元素要实现 Delayed 接口,该接口定义如下:public interface Delayed extends Comparable<Delayed> { /** * 以给定的时间单位,返回该对象的剩余延迟 * 若为零或者负数表示延时已经过去 */ long getDelay(TimeUnit unit); }Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。

DelayQueue

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,如果它的延迟时间未到,则无法取出。

DelayQueue 的类签名和继承结构如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {}

jdk全栈开发教程(JDK源码详解DelayQueue)(1)

下面分析其代码实现。

代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口定义如下:

public interface Delayed extends Comparable<Delayed> { /** * 以给定的时间单位,返回该对象的剩余延迟 * 若为零或者负数表示延时已经过去 */ long getDelay(TimeUnit unit); }

Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。

Comparable 接口也只有一个 compareTo 方法:

public interface Comparable<T> { public int compareTo(T o); }

这里不再详述。

构造器

DelayQueue 有两个构造器,如下:

// 无参构造器 public DelayQueue() {} // 指定集合的构造器 public DelayQueue(Collection<? extends E> c) { // 该方法最后是通过 add 方法实现的,后文进行分析 this.addAll(c); }

成员变量

// 锁,用于保证线程安全 private final transient ReentrantLock lock = new ReentrantLock(); // 优先队列,实际存储元素的地方 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 线程等待的标识 private Thread leader = null; // 触发条件,表示是否可以从队列中读取元素 private final Condition available = lock.newCondition();

关于优先队列可参考前文「JDK源码分析-PriorityQueue」的分析。

入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E) offer(E) put(E) 等,它们的定义如下:

public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); } public boolean offer(E e long timeout TimeUnit unit) { return offer(e); }

这几个方法都是通过 offer(E) 方法实现的,它的代码如下:

public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入队 q.offer(e); // 若该元素为队列头部元素,唤醒等待的线程 // (表示可以从队列中读取数据了) if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

出队方法

有入队自然也有出队,主要方法有:poll() take() poll(timeout unit) 如下:

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 获取队列头部元素 E first = q.peek(); // 头部元素为空,或者延时未到,则返回空 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; // 否则返回头部元素 else return q.poll(); } finally { lock.unlock(); } }

poll 方法是非阻塞的,即调用之后无论元素是否存在都会立即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try { // 无限循环 for (;;) { // 获取队列头部元素 E first = q.peek(); // 若为空,则等待 if (first == null) available.await(); // 若不为空 else { // 获取延迟的纳秒数,若小于等于零(即过期),则获取并删除头部元素 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); // 执行到这里,表示 delay>0,也就是延时未过期 first = null; // don't retain ref while waiting // leader 不为空表示有其他线程在读取数据,当前线程等待 if (leader != null) available.await(); else { // 将当前线程设置为 leader thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待延迟时间过期 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { // 唤醒该条件下的其他线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

该方法看起来稍复杂,主要逻辑如下:

1. 获取队列头部元素;

1.1 若该元素为空(队列为空),则当前线程等待;

1.2 若该元素不为空,且已经过期,则取出该元素(并移除);

1.2.1 若未过期,且有其他线程在操作(leader 不为空),当前线程等待;

1.2.2 若未过期,且没有其他线程操作,则占有“操作权”(将 leader 设置为当前线程),并等待延迟过期。

以上操作循环执行。

take 方法是阻塞操作,当条件不满足时会一直等待。另一个 poll(timeout unit) 方法和它有些类似,只不过带有延时,如下:

public E poll(long timeout TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try { // 无限循环 for (;;) { // 获取队列的头部元素 E first = q.peek(); // 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间; // 否则(即超时时间小于等于零)返回空 if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { // 执行到这里表示队列头部元素不为空 // 获取剩余延时 long delay = first.getDelay(NANOSECONDS); // 延时已过期,返回队列头部元素 if (delay <= 0) return q.poll(); // 延时未过期且等待超时,返回空 if (nanos <= 0) return null; first = null; // don't retain ref while waiting // 延时未过期且等待未超时,且等待超时<延迟时间 // 表示有其他线程在取数据,则当前线程进入等待 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作 Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // 计算剩余延迟时间 nanos -= delay - timeLeft; } finally { // 该线程操作完毕,把 leader 置空 if (leader == thisThread) leader = null; } } } } } finally { // 唤醒 available 条件下的一个其他线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

take 和 poll 方法还有一个区别:当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空。

此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不同的是,peek 方法只是读取队列头部元素,并不会将其删除:

public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { // 返回队列的头部元素(不删除) return q.peek(); } finally { lock.unlock(); } }

以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。

用法举例

示例代码:

自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到一个延迟队列中,代码如下:

public class TestDelayedQueue { public static void main(String[] args) throws Exception { BlockingQueue<Task> delayQueue = new DelayQueue<>(); long now = System.currentTimeMillis(); delayQueue.put(new Task("c" now 6000)); delayQueue.put(new Task("d" now 10000)); delayQueue.put(new Task("a" now 3000)); delayQueue.put(new Task("b" now 4000)); while (true) { System.out.println(delayQueue.take()); TimeUnit.SECONDS.sleep(1); } } private static class Task implements Delayed { private String taskName; private long endTime; public Task(String taskName long endTime) { this.taskName = taskName; this.endTime = endTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(endTime - System.currentTimeMillis() TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(getDelay(TimeUnit.MILLISECONDS) o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "taskName-->" taskName; } } }

结果会以延迟时间的顺序取出各个元素。

小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;

2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则无法读取到队列头部的元素;

3. 它是线程安全的。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 也是一个线程池类,是线程池类 ThreadPoolExecutor 的子类。除了 ThreadPoolExecutor 相关的方法之外,它还增加了执行定时任务和周期性任务的方法。它的类签名和继承结构如下:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}

jdk全栈开发教程(JDK源码详解DelayQueue)(2)

可以看到,它继承了 ThreadPoolExecutor 类(参考 「JDK源码分析-ThreadPoolExecutor」),并且实现了 ScheduledExecutorService 接口(参考 「JDK源码分析-ScheduledExecutorService」),因此具有二者的特性。下面分析其代码实现。

代码分析

内部嵌套类 DelayedWorkQueue

先看它的一个内部嵌套类 DelayedWorkQueue,该类是一个延迟队列,它的类签名和继承结构如下:

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}

jdk全栈开发教程(JDK源码详解DelayQueue)(3)

DelayedWorkQueue 类与前文分析的 DelayQueue 「JDK源码分析-DelayQueue」实现原理类似,这里就不再赘述。

构造器

ScheduledThreadPoolExecutor 有如下四个构造器:

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize Integer.MAX_VALUE 0 NANOSECONDS new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize ThreadFactory threadFactory) { super(corePoolSize Integer.MAX_VALUE 0 NANOSECONDS new DelayedWorkQueue() threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize RejectedExecutionHandler handler) { super(corePoolSize Integer.MAX_VALUE 0 NANOSECONDS new DelayedWorkQueue() handler); } public ScheduledThreadPoolExecutor(int corePoolSize ThreadFactory threadFactory RejectedExecutionHandler handler) { super(corePoolSize Integer.MAX_VALUE 0 NANOSECONDS new DelayedWorkQueue() threadFactory handler); }

这几个构造器都是直接调用父类 ThreadPoolExecutor 的构造器,只是传入了不同的参数。而其中的参数 workQueue 都传入了上面的延迟队列 DelayedWorkQueue。

内部类 ScheduledFutureTask

ScheduledThreadPoolExecutor 还有一个内部类 ScheduledFutureTask,它的继承结构如下:

jdk全栈开发教程(JDK源码详解DelayQueue)(4)

它继承了 FutureTask 类(可参考前文「JDK源码分析-FutureTask」的分析),且实现了 RunnableScheduledFuture 接口,该接口定义如下:

public interface RunnableScheduledFuture<V> extends RunnableFuture<V> ScheduledFuture<V> { // 一个任务是否周期性执行的,若是则可以重复执行;否则只能运行一次 boolean isPeriodic(); }

RunnableScheduledFuture 只定义了一个方法 isPeriodic,该方法用于判断一个任务是否是周期性执行的。它继承的 RunnableFuture 接口在前文 FutureTask 类中已进行分析,而 ScheduledFuture 接口如下:

public interface ScheduledFuture<V> extends Delayed Future<V> { }

它的内部并未定义方法,只是整合了 Delayed 接口和 Future 接口,Delayed 接口前文也已分析,下面分析该类的主要代码。

先看它的主要成员变量:

// 定时任务执行的时间(单位:纳秒) private long time; /** * 重复执行的任务的时间间隔(单位:纳秒) * 正数表示固定频率(fixed-rate)执行 * 负数表示固定延迟(fixed-delay)执行 * 零表示非重复执行的任务 */ private final long period; // reExecutePeriodic 方法中重新排队的任务 RunnableScheduledFuture<V> outerTask = this; // 延迟队列中的索引位置,便于快速取消 int heapIndex;

构造器:

/** * 构造器一:用给定的触发时间(纳秒),创建一个一次性任务 */ ScheduledFutureTask(Runnable r V result long ns) { super(r result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * 构造器二:用给定的触发时间和间隔(纳秒),创建一个周期性任务 */ ScheduledFutureTask(Runnable r V result long ns long period) { super(r result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * 构造器三:用给定的触发时间(纳秒),创建一个一次性任务 */ ScheduledFutureTask(Callable<V> callable long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }

ScheduledFutureTask 有三个构造器,可分为两类:分别是创建一次性任务(一和三)和周期性任务(二)。其中一和三还是 Runnable 和 Callable 的区别。

该类是一个任务类,即 Runnable 接口的实现类,因此它最核心的就是 run 方法,如下:

public void run() { // 是否为周期性任务 boolean periodic = isPeriodic(); // 若任务不能执行,则取消 if (!canRunInCurrentRunState(periodic)) cancel(false); // 若为非周期性任务 else if (!periodic) // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 run 方法执行 ScheduledFutureTask.super.run(); // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 runAndReset 方法执行 else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 设置下一次执行时间 reExecutePeriodic(outerTask); // 周期性执行 } }

reExecutePeriodic 方法如下:

/** * 该方法主要是将周期性任务重新排队 * 它的实现与 delayedExecute 方法(后面分析)逻辑有些类似 */ void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }

schedule & scheduleAtFixedRate & scheduleWithFixedDelay

这几个就是执行定时任务和周期性任务的方法,它们是对前文 「JDK源码分析-ScheduledExecutorService」接口所定义的方法实现,可参考前文的分析。

schedule 方法 1:其作用是延迟指定的时间后执行任务(即执行定时任务),只会执行一次。

public ScheduledFuture<?> schedule(Runnable command long delay TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // 把用户提交的 Runnable 对象包装为 RunnableScheduledFuture 对象 // decorateTask 方法默认返回第二个参数 // decorateTask 方法的修饰符是 protected,可根据需求自行扩展 RunnableScheduledFuture<?> t = decorateTask(command new ScheduledFutureTask<Void>(command null triggerTime(delay unit))); // 执行给定的任务 delayedExecute(t); return t; }

delayExecute 方法:

/* * 延迟或周期性任务的主要执行方法。 * 若线程池已关闭,则拒绝该任务(执行拒绝策略); * 否则将任务添加到工作队列,若有需要启动一个线程去执行。 * 若在添加任务时关闭了线程池,则将其从队列移除并取消该任务 */ private void delayedExecute(RunnableScheduledFuture<?> task) { // 若线程池已关闭,则执行拒绝策略 if (isShutdown()) reject(task); else { // 将该任务添加到任务队列(即前面的延迟队列) super.getQueue().add(task); // 若当前任务无法执行,则将其从队列移除并且取消执行(类似事务的回滚操作) if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 任务可以执行,若有需要新增线程以执行该任务 else ensurePrestart(); } }

schedule 方法 2:

public <V> ScheduledFuture<V> schedule(Callable<V> callable long delay TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable new ScheduledFutureTask<V>(callable triggerTime(delay unit))); delayedExecute(t); return t; }

该方法与前者类似,差别在于这里的参数类型是 Callable,前者是 Runnable 类型,其他操作一样。

scheduleAtFixedRate 方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command long initialDelay long period TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 将 Runnable 对象包装为 ScheduledFutureTask 对象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command null triggerTime(initialDelay unit) unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command sft); sft.outerTask = t; delayedExecute(t); return t; }

该方法与前面的 schedule 方法类似,区别仅在于使用了不同的 ScheduledFutureTask 对象,其他的执行流程几乎一样。

scheduleWithFixedDelay 方法:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command long initialDelay long delay TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command null triggerTime(initialDelay unit) unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command sft); sft.outerTask = t; delayedExecute(t); return t; }

该方法与 scheduleAtFixedRate 方法基本一样,区别仅在于构建 ScheduledFutureTask 对象时参数 period 不同(一正一负,用以区分类型)。

execute & submit 方法

这两个方法是 Executor 接口和 ExecutorService 接口所定义的方法,代码实现如下:

public void execute(Runnable command) { schedule(command 0 NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task 0 NANOSECONDS); }

它们内部直接调用了 schedule(Runnable) 方法。另外两个 submit 方法:

public <T> Future<T> submit(Runnable task T result) { return schedule(Executors.callable(task result) 0 NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task 0 NANOSECONDS); }

它们内部直接调用了 schedule(Callable) 方法。

小结

1. ScheduledThreadPoolExecutor 是线程池的实现类之一;

2. 它继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口;

3. 提供了异步提交任务的 execute 方法和 submit 方法;

4. 提供了执行定时任务的 schedule 方法和周期性任务的 scheduleAtFixedRate/scheduleWithFixedDelay 方法(使用延迟队列实现)。

猜您喜欢: