并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)

摘要:
在上一篇《并发编程(十一)——Java线程池实现原理与源码深度解析(一)》中提到了线程池ThreadPoolExecutor的原理以及它的execute方法。在线程池同样execute提供一个不需要返回结果的任务执行,而对于需要结果返回的则可调用其submit方法。AbstractExecutorService我们把上一篇文章的代码贴过来1publicabstractclassAbstractExecutorServiceimplementsExecutorService{23//RunnableFuture是用于获取执行结果的,我们常用它的子类FutureTask4//下面两个newTaskFor方法用于将我们的任务包装成FutureTask提交到线程池中执行5protectedRunnableFuturenewTaskFor{6returnnewFutureTask;7}89protectedRunnableFuturenewTaskFor{10returnnewFutureTask;11}1213//提交任务14publicFuture˂?

在上一篇《并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)》中提到了线程池ThreadPoolExecutor的原理以及它的execute方法。这篇文章是接着上一篇文章写的,如果你没有阅读上一篇文章,建议你去读读。本文解析ThreadPoolExecutor#submit。

对于一个任务的执行有时我们不需要它返回结果,但是有我们需要它的返回执行结果。对于线程来讲,如果不需要它返回结果则实现Runnable,而如果需要执行结果的话则可以实现Callable。在线程池同样execute提供一个不需要返回结果的任务执行,而对于需要结果返回的则可调用其submit方法。

AbstractExecutorService

我们把上一篇文章的代码贴过来

1 public abstract class AbstractExecutorService implementsExecutorService {
2 
3     //RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
4     //下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
5     protected <T> RunnableFuture<T>newTaskFor(Runnable runnable, T value) {
6         return new FutureTask<T>(runnable, value);
7 }
8 
9     protected <T> RunnableFuture<T> newTaskFor(Callable<T>callable) {
10         return new FutureTask<T>(callable);
11 }
12 
13     //提交任务
14     public Future<?>submit(Runnable task) {
15         if (task == null) throw newNullPointerException();
16         //1. 将任务包装成 FutureTask
17         RunnableFuture<Void> ftask = newTaskFor(task, null);
18         //2. 交给执行器执行,execute 方法由具体的子类来实现
19         //前面也说了,FutureTask 间接实现了Runnable 接口。
20 execute(ftask);
21         returnftask;
22 }
23 
24     public <T> Future<T>submit(Runnable task, T result) {
25         if (task == null) throw newNullPointerException();
26         //1. 将任务包装成 FutureTask
27         RunnableFuture<T> ftask =newTaskFor(task, result);
28         //2. 交给执行器执行
29 execute(ftask);
30         returnftask;
31 }
32 
33     public <T> Future<T> submit(Callable<T>task) {
34         if (task == null) throw newNullPointerException();
35         //1. 将任务包装成 FutureTask
36         RunnableFuture<T> ftask =newTaskFor(task);
37         //2. 交给执行器执行
38 execute(ftask);
39         returnftask;
40 }
41 }

尽管submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的线程则是没有返回值的,也就是说在上面的3个方法中,submit(Callable<T>task)能获取到它的返回值,submit(Runnabletask,Tresult)能通过传入的载体result间接获得线程的返回值或者准确来说交给线程处理一下,而最后一个方法submit(Runnabletask)则是没有返回值的,就算获取它的返回值也是null。

使用示例

submit(Callable<T>task)

1 /**
2 * @author: ChenHao
3 * @Date: Created in 14:54 2019/1/11
4  */
5 public classTest {
6     public static void main(String[] args) throwsExecutionException, InterruptedException {
7         Callable<String> callable = new Callable<String>() {
8             public String call() throwsException {
9                 System.out.println("This is ThreadPoolExetor#submit(Callable<T> task) method.");
10                 return "result";
11 }
12 };
13 
14         ExecutorService executor =Executors.newSingleThreadExecutor();
15         Future<String> future =executor.submit(callable);
16 executor.shutdown();
17 System.out.println(future.get());
18 }
19 }

运行结果:

并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)第1张

submit(Runnabletask,Tresult)

1 /**
2 * @author: ChenHao
3 * @Date: Created in 14:54 2019/1/11
4  */
5 public classTest {
6     public static void main(String[] args) throwsExecutionException, InterruptedException {
7 
8         ExecutorService executor =Executors.newSingleThreadExecutor();
9         Data data = newData();
10         Future<Data> future = executor.submit(newTask(data), data);
11 executor.shutdown();
12 System.out.println(future.get().getName());
13 }
14 }
15 classData {
16 String name;
17     publicString getName() {
18         returnname;
19 }
20     public voidsetName(String name) {
21         this.name =name;
22 }
23 }
24 
25 class Task implementsRunnable {
26 Data data;
27     publicTask(Data data) {
28         this.data =data;
29 }
30 @Override
31     public voidrun() {
32         System.out.println("This is ThreadPoolExetor#submit(Runnable task, T result) method.");
33         data.setName("陈浩");
34 }
35 }

运行结果:

并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)第2张

submit(Runnabletask)

1 /**
2 * @author: ChenHao
3 * @Date: Created in 14:54 2019/1/11
4  */
5 public classTest {
6     public static void main(String[] args) throwsExecutionException, InterruptedException {
7         Runnable runnable = newRunnable() {
8 @Override
9             public voidrun() {
10                 System.out.println("This is ThreadPoolExetor#submit(Runnable runnable) method.");
11 }
12 };
13 
14         ExecutorService executor =Executors.newSingleThreadExecutor();
15         Future future =executor.submit(runnable);
16 executor.shutdown();
17 System.out.println(future.get());
18 }
19 }

运行结果:

并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)第3张

从上面的源码可以看到,这三者方法几乎是一样的,关键就在于:

1 RunnableFuture<T> ftask =newTaskFor(task);
2 execute(ftask);

是如何将一个任务作为参数传递给了newTaskFor,然后调用execute方法,最后进而返回ftask的呢?

1 protected <T> RunnableFuture<T>newTaskFor(Runnable runnable, T value) {
2     return new FutureTask<T>(runnable, value);
3 }
4 
5 protected <T> RunnableFuture<T> newTaskFor(Callable<T>callable) {
6     return new FutureTask<T>(callable);
7 }

源码分析

这里我建议大家去看看我之前的一篇文章《Java 多线程(五)—— 线程池基础 之 FutureTask源码解析

submit(Callable<T> task)

我们看上面的源码中知道实际上是调用了如下代码

1 protected <T> RunnableFuture<T> newTaskFor(Callable<T>callable) {
2     return new FutureTask<T>(callable);
3 }

我们看看FutureTask 的结构

1 public class FutureTask<V> implements RunnableFuture<V>{ 
2     private volatile intstate; 
3     private static final int NEW = 0; //初始状态 
4     private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。 
5     private static final int NORMAL = 2; //任务正常完成,结果被set 
6     private static final int EXCEPTIONAL = 3; //任务抛出异常 
7     private static final int CANCELLED = 4; //任务已被取消 
8     private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断 
9     private static final int INTERRUPTED = 6; //线程已被中断 
10 
11     //将要执行的任务 
12     private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常 
13     private Object outcome; //non-volatile, protected by state reads/writes //执行callable的线程,调用FutureTask.run()方法通过CAS设置 
14     private volatile Thread runner; //栈结构的等待队列,该节点是栈中的最顶层节点。 
15     private volatileWaitNode waiters; 
16 
17     public FutureTask(Callable<V>callable) {
18         if (callable == null)
19             throw newNullPointerException();
20         this.callable =callable;
21         this.state = NEW;       //ensure visibility of callable
22 }
23 ....
24 }
1 public interface RunnableFuture<V> extends Runnable, Future<V>{
2     /**
3 * Sets this Future to the result of its computation
4 * unless it has been cancelled.
5      */
6     voidrun();
7 }

我们知道FutureTask 继承了Runnable,这里将Callable<T> callable 的实例封装成FutureTask 传给execute(ftask);我们再来看看上一篇文章中线程运行的代码

1 //此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
2 //前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
3 final voidrunWorker(Worker w) {
4     Thread wt =Thread.currentThread();
5     //该线程的第一个任务(如果有的话)
6     Runnable task =w.firstTask;
7     w.firstTask = null;
8     w.unlock(); //allow interrupts
9     boolean completedAbruptly = true;
10     try{
11         //循环调用 getTask 获取任务
12         while (task != null || (task = getTask()) != null) {
13 w.lock();          
14             //如果线程池状态大于等于 STOP,那么意味着该线程也要中断
15             if ((runStateAtLeast(ctl.get(), STOP) ||
16                  (Thread.interrupted() &&
17                   runStateAtLeast(ctl.get(), STOP))) &&
18                 !wt.isInterrupted())
19 wt.interrupt();
20             try{
21 beforeExecute(wt, task);
22                 Throwable thrown = null;
23                 try{
24                     //到这里终于可以执行任务了,这里是最重要的,task是什么?是Worker 中的firstTask属性
25                     
26 task.run();
27                 } catch(RuntimeException x) {
28                     thrown = x; throwx;
29                 } catch(Error x) {
30                     thrown = x; throwx;
31                 } catch(Throwable x) {
32                     thrown = x; throw newError(x);
33                 } finally{
34 afterExecute(task, thrown);
35 }
36             } finally{
37                 //一个任务执行完了,这个线程还可以复用,接着去队列中拉取任务执行
38                 //置空 task,准备 getTask 获取下一个任务
39                 task = null;
40                 //累加完成的任务数
41                 w.completedTasks++;
42                 //释放掉 worker 的独占锁
43 w.unlock();
44 }
45 }
46         completedAbruptly = false;
47     } finally{
48         //如果到这里,需要执行线程关闭:
49         //说明 getTask 返回 null,也就是超过corePoolSize的线程过了超时时间还没有获取到任务,也就是说,这个 worker 的使命结束了,执行关闭
50 processWorkerExit(w, completedAbruptly);
51 }
52 }

由上面第6行代码task 就是execute(ftask)传入的任务,第26行task.run(); 实际上就是new FutureTask<T>(callable).run(),我们看看FutureTask中的run()方法

1 public voidrun() {
2     //保证callable任务只被运行一次
3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
4         return;
5     try{
6         Callable < V > c =callable;
7         if (c != null && state ==NEW) {
8 V result;
9             booleanran;
10             try{ 
11                 //执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的
12                 result =c.call();
13                 //上面的call()是同步的,只有上面的result有了结果才会继续执行
14                 ran = true;
15             } catch(Throwable ex) {
16                 result = null;
17                 ran = false;
18 setException(ex);
19 }
20             if(ran)
21                 //执行完了,设置result
22 set(result);
23 }
24 }
25     finally{
26         runner = null;
27         int s =state;
28         //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
29         if (s >=INTERRUPTING)
30 handlePossibleCancellationInterrupt(s);
31 }
32 }

在FutureTask的构造方法中this.callable = callable; ,因此我们可以知道上面run()方法中第6行 c 就是 代码示例中的 new Callable<String>(),c.call()就是调用代码示例中new Callable 的call方法,并且这里可以取到返回结果,第22行处设置FutureTask 中 outcome 的值,这样线程就可以取到返回值了。

1 protected voidset(V v) {
2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3         outcome =v;
4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //final state
5 finishCompletion();
6 }
7 }

取值我就不分析了,我之前的文章里面有详细分析。

submit(Runnabletask,Tresult)

1 public <T> Future<T>submit(Runnable task, T result) {
2     if (task == null) throw newNullPointerException();
3     RunnableFuture<T> ftask =newTaskFor(task, result);
4 execute(ftask);
5     returnftask;
6 }
7 
8 protected <T> RunnableFuture<T>newTaskFor(Runnable runnable, T value) {
9     return new FutureTask<T>(runnable, value);
10 }

我们来看看FutureTask的另外一个构造方法

1 publicFutureTask(Runnable runnable, V result) {
2     this.callable =Executors.callable(runnable, result);
3     this.state = NEW;       //ensure visibility of callable
4 }
1 public static <T> Callable<T>callable(Runnable task, T result) {
2     if (task == null)
3         throw newNullPointerException();
4     return new RunnableAdapter<T>(task, result);
5 }
6 
7 static final class RunnableAdapter<T> implements Callable<T>{
8     finalRunnable task;
9     finalT result;
10 RunnableAdapter(Runnable task, T result) {
11         this.task =task;
12         this.result =result;
13 }
14     publicT call() {
15 task.run();
16         returnresult;
17 }
18 }

上面将runnable, result 封装成了RunnableAdapter 作为FutureTask的callable属性,这和上面的submit(Callable<T> task) 是不同的,submit(Callable<T> task)是直接将 Callable<T>task作为FutureTask的callable属性。我们看看FutureTask中的run()方法中第6行 c 就是FutureTask 构造方法中的new RunnableAdapter<T>(task, result),c.call()就是调用RunnableAdapter<T>(task, result) 的call方法,call()中的task.run()就是上面代码示例中new Task(data) 中的 run(),run()方法中业务大代码改变了data对象的属性,callable(Runnable task, T result)中也是传的相同的对象data, 所以,result = c.call(); 就是把更改后的data返回,并且将data设置为设置FutureTask 中 outcome 的值,后面的逻辑就是一样的了。

这里可以看成将同一个data传入线程进行处理,同时这个data也传入FutureTask中,并且在RunnableAdapter通过属性进行保存data,等线程将data处理完了,由于是同一个对象,RunnableAdapter中的result也就是data指向的是同一个对象,然后把此result返回到FutureTask保存在属性outcome中,就可以通过FutureTask.get()取到运行结果了。

如果new FutureTask<T>(runnable, null),则result = c.call(); 返回的值也是null,最后从线程池中get的值也是null。

免责声明:文章转载自《并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇scan chain的原理和实现——8.AT SPEED Test &amp;amp; OCCVUE自学之路7-vue模版语法(双向数据绑定)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

java 调用apache.commons.codec的包简单实现MD5加密

转自:https://blog.csdn.net/mmd1234520/article/details/70210002/ 1 importjava.security.MessageDigest; 2 importjava.security.NoSuchAlgorithmException; 3 4 import org.apache....

poi解析office文档内容的工具类

第一步引入依赖 <!--xls--> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId>...

C# 通过ServiceStack 操作Redis——List类型的使用及示例

Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销, /// <summary> /// Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销, /// Redis内部的很多实现,包括发送缓冲队列等也都是用的这...

javascript base64 编码,兼容ie6789

用Javascript进行base64编码,在高版本的IE浏览器(IE9以上版本),和firefox,chrome浏览器里是非常方便的。这些浏览器的window对象中内置了base64的编码和解码方法。 var base64String = window.btoa(string) ;//编码 var string = window.atob(base64s...

11-C#反射机制

C#反射机制 转自:http://blog.csdn.net/educast/article/details/2894892 反射的用途:    (1)使用Assembly定义和加载程序集,加载在程序集清单中列出模块,以及从此程序集中查找类型并创建该类型的实例。     (2)使用Module了解包含模块的程序集以及模块中的类等,还可以获取在模块上定义的所...

springboot 实现注解获取操作日志

1.第一步加入pom依赖 <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-aop</artifactId> </dependency> 2.第二步加...