快捷搜索:  汽车  科技

线程池中的阻塞队列放多少合适(Future掌控未来之Callable跨线程返回和抛出异常原理)

线程池中的阻塞队列放多少合适(Future掌控未来之Callable跨线程返回和抛出异常原理)  如图,main线程把任务FutureTask传给Thread 并启动Thread.start() 然后在run()方法中调用Callable.call()方法,得到返回值后通过Future.get()方法返回给主线程。  因为线程是异步的,要想获取结果,我们是不是需要阻塞主线程,然后等待线程结束后把结果回调到主线程上,自 Java 1.5 Java爸爸给我们提供了一个接口Future,简单地说,Future类代表异步线程的未来结果。这个结果最终会在处理完成后出现在Future中。Future.get()方法就实现了阻塞的功能,具体可以看一下这个图  在介绍之前我们先来想一下,如果让你实现线程返回结果的功能你要怎么办?如下图所示:  如图main线程中异步启动一个线程 Thread.start() 事实上等线程执行完后,main线程早就结束了。所以说如果让我们来实现线程返回结果

我们知道线程的发起Thread.start() 实质上是,start()调用native方法 start0() 然后唤起系统线程,在系统线程中回调 Runnable中的run()方法。而且整个过程是异步的,导致在Runnable的两个致命缺陷,第一个是不能返回结果,第二个是不能抛出异常。所以Java爸爸后来引入了Callable这个接口,这个接口旨在解决这两个缺陷。

  至于怎么用Callable 相信都已经轻车熟路了,作为一个高级程序员只知道怎么用是不够的,我们还需要知道为什么?那么我们先思考下面几个问题:

  1. call()方法是否也是和run()方法一样通过系统线程直接调用的?
  2. Callable是怎么把结果返回给主线程?
  3. Callable是怎么把异常抛出给主线程的?

相信在接下来的几分钟里,你会对Callable有全新的认识。

  其实实现这些功能单单靠Callable一个接口是办不到的,还需要借助Future、FutureTask类来完成这个功能。

三个臭皮匠Future、FutureTask、Callable介绍

  在介绍之前我们先来想一下,如果让你实现线程返回结果的功能你要怎么办?

如下图所示:

线程池中的阻塞队列放多少合适(Future掌控未来之Callable跨线程返回和抛出异常原理)(1)

  如图main线程中异步启动一个线程 Thread.start() 事实上等线程执行完后,main线程早就结束了。所以说如果让我们来实现线程返回结果的功能我们得需要通过曲线救国的方式来实现,什么意思呢?

  因为线程是异步的,要想获取结果,我们是不是需要阻塞主线程,然后等待线程结束后把结果回调到主线程上,自 Java 1.5 Java爸爸给我们提供了一个接口Future,简单地说,Future类代表异步线程的未来结果。这个结果最终会在处理完成后出现在Future中。Future.get()方法就实现了阻塞的功能,具体可以看一下这个图

线程池中的阻塞队列放多少合适(Future掌控未来之Callable跨线程返回和抛出异常原理)(2)

  如图,main线程把任务FutureTask传给Thread 并启动Thread.start() 然后在run()方法中调用Callable.call()方法,得到返回值后通过Future.get()方法返回给主线程。

看一下整体关系图:

线程池中的阻塞队列放多少合适(Future掌控未来之Callable跨线程返回和抛出异常原理)(3)

  由类图可以看到,FutureTask是Future和Runnable的实现类,同时持有Thread和Callable实例,FutureTask实现了Future的功能,也就是说FutureTask不仅管理着线程的阻塞获取结果(get()),线程取消中断(cancel())等功能。

注意FutureTask中定义了个Object outcome变量来存放结果。

定义了个链表waiters来存放所有等待获取结果的线程

事实上我们可以把FutureTask看成一个任务管理中心,他融合了Thread Callable的功能(这里算是一个适配器模式了)。那么我们结合着代码来分析一下具体的逻辑。

实现代码分析

带着上面的关系图和流程图还有问题看一下代码

首先我们来看一下简单的使用流程,这里不多做赘述

  1. 创建FutureTask实例。
  2. 把实例传递给Thread并启动
  3. 阻塞获取结果

public class FutureTest { public static void main(String[] args) { //1. 创建FutureTask并传入callable实例 Future<String> stringFutureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return "FutureTask"; } }); //2. 把实例传递给Thread并启动 new Thread(stringFutureTask).start(); String s = ""; try { //3. 阻塞获取结果 s = stringFutureTask.get(); } catch (Exception e) { e.printStackTrace(); } System.out.printf(s); } }

接下来我们结合源码分析一下这三步。

1. 创建FutureTask并传入callable实例

public FutureTask(Callable<V> callable) { ... this.callable = callable; this.state = NEW; }

把callable传给给FutureTask.callable变量。

2. 把实例传递给Thread并启动并调用run()方法

  执行new Thread(stringFutureTask).start(),这里启动后实际上调用的是Runnable.run()方法,具体为啥是调用run() 可参照线程的实现方式 我们看一下FutureTask.run() 源码

@Override public void run() { //把当前线程赋值给FutureTask.runner 实例 runner = Thread.currentThread(); try { // 赋值变量 Callable<V> c = callable; //启动状态为NEW if (c != null && state == NEW) { // 定义结果变量 V result; boolean ran; try { //1. 这里调用 callable.call() 方法,也就是第一步传入的callable. 并返回结果 result = c.call(); ran = true; } catch (Throwable ex) { //如果抛出异常, result = null; ran = false; //改变线程状态为 COMPLETING if (STATE.compareAndSet(this NEW COMPLETING)) { //2. 把异常赋给 outcome 变量 outcome = ex; // 改变线程状态为 EXCEPTIONAL STATE.setRelease(this EXCEPTIONAL); } } if (ran) { //设置结果并通知所有等待的线程 set(result); } } } finally { ... } } protected void set(V v) { // 改变线程状态为 EXCEPTIONAL if (STATE.compareAndSet(this NEW COMPLETING)) { //3. 把结果赋给 outcome 变量 outcome = v; // 改变线程状态为 NORMAL STATE.setRelease(this NORMAL); // 4. 遍历阻塞等待的获取锁的线程,通知他们锁已释放 for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this q null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 通知锁已释放 LockSupport.unpark(t); } FutureTask.WaitNode next = q.next; if (next == null) { break; } q.next = null; q = next; } break; } } callable = null; } }

上面的就是线程运行的源码,核心点有4个,

  1. 就是在这里调用 call()方法。
  2. 如果抛出异常把异常存到 Object outcome变量里面
  3. 如果正常返回结果,把结果存到 Object outcome中。至此线程运行完毕。
  4. 遍历阻塞等待的获取锁的线程,通知他们锁已释放

其实就是线程运行完后 把正常结果或者异常结果存到 Object outcome 对象中,释放锁并通知所有等待的线程。

到这里就可以回答开篇的第一个问题 1. call()方法是否也是和run()方法一样通过系统线程直接来调用的? 调用流程是:

Thread.start() --> native start0() --> run() -> call()

可以看出,call() 方法是通过 run() 来调用的,当然这也是在线程中。

3. 阻塞获取结果

这一步是调用 Future.get()方法阻塞线程,等待结果,我们看一下源码:

public V get() throws Exception { int s = state; if (s <= COMPLETING) { // 1. 如果线程还在执行,就就到waiters 链表里面阻塞等待结果。 s = awaitDone(); } // 获取结果 if (s == NORMAL) { // 2. 如果正常就返回正常的结果 outcome return (V)outcome; } // 3. 如果异常就直接抛出 outcome throw new Exception((Throwable)outcome); } // 这个方法的意思就是,如果线程还在执行,就到waiters 链表里面等待, // 一直到被 LockSupport.unpark() 唤醒 private int awaitDone() { WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { return s; } else if (s == COMPLETING) { Thread.yield(); } else if (q == null) { q = new WaitNode(); } if (!queued) { queued = WAITERS.weakCompareAndSet(this q.next = waiters q); } else { LockSupport.park(this); } } }

上面的就是阻塞获取结果的源码,核心点有3个,

  1. 如果线程还在执行,就就到waiters 链表里面阻塞等待结果。
  2. 如果线程执行完并正常,就返回正常的结果 outcome
  3. 如果异常就直接抛出 outcome。

看到这里,我们再来回顾一下开篇的几个问题,你是不是有了答案了。

最后

  到这里,Callable Future 相关的都分析完了,源码解析都比较枯燥,写这么多也不容易,感谢大家看到这里,有什么意见或者建议可以留言一起讨论!

原文链接:https://juejin.cn/post/7117031279178842148#comment



猜您喜欢: