FutureTask详解

摘要:
{Vcall()threwsException;同时实现Future和Runnable接口。*NEW-->INTERRUPTED1.3 FutureTask的执行过程创建一个FutureTask对象task,将任务提交给调度器执行器进行调度,或在另一个线程中执行任务进行调度…如果currentThread调用执行结果task.get(),

1 基本概念

1.1 Callable与Future

Runnable封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。Callable与Runnable类似,但是有返回值。Callable接口是一个参数化的类型,只有一个方法call。

public interface Callable<V> {
	V call() throws Exception;
}

类型参数是返回值的类型。例如,

Callable<Integer>表示一个最终返回Integer对象的异步计算。

Future保存异步计算的结果。可以启动一个计算,将Future对象交给某个线程,然后忘掉它。Future对象的所有者在结果计算好之后就可以获得它。
Future接口具有下面的方法:

public interface Future<V> {
	boolean cancel(boolean mayInterruptIfRunning);
	boolean isCancelled();
	boolean isDone();
	V get() throws InterruptedException, ExecutionException;
	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

第一个get方法的调用被阻塞,知道计算完成。如果在计算完成之前,第二个get方法的调用超时,抛出一个TimeoutException异常。如果运行该计算的线程被中断,两个方法都将抛出InterruptedException。如果计算已经完成,那么get方法立即返回。

如果计算还在进行,isDone方法返回false;如果完成了,则返回true。

可以用cancel方法取消该计算。如果计算还没有开始,它被取消且不再开始。如果计算处于运行之中,那么如果mayInterrupt参数为true,它就被中断。

1.2 FutureTask

FutureTask包装器是一种非常便利的机制,同时实现了Future和Runnable接口。

类图如下:

FutureTask详解第1张

FutureTask的状态转换过程:

 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED

1.3 FutureTask的执行过程

创建一个futureTask对象task
提交task到调度器executor等待调度或者在另外一个线程中执行task

等待调度中...

如果此时currentThread调取执行结果task.get(),会有几种情况
if task 还没有被executor调度或正在执行中
    阻塞当前线程,并加入到一个阻塞链表中waitNode
else if task被其它Thread取消,并取消成功 或task处于中断状态
    throw exception
else if task执行完毕,返回执行结果,或执行存在异常,返回异常信息
    
        
如果此时有另外一个线程调用task.get()
    
执行过程同上

2 应用场景

1. Future用于异步获取执行结果或者取消任务。
2. 在高并发场景下确保任务只执行一次。

3 基本例子

Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);
t.start();
...
Integer result = task.get(); //获取结果

4 FutureTask源码分析

4.1 核心状态

 /**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 * 
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

4.2 创建FutureTask

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
    
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

4.3 获取执行结果

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

4.4 执行方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

4.5 设置状态

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

5 高级示例

public class Memoizer<A, V> implements Computable<A, V> {
	private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>();
	private final Computable<A, V> c;
	
	public Memoizer(Computable<A, V> c) {
		this.c = c;
	}
	
	public C computer(final A arg) throws InterruptedException {
		while(true) {
			Future<V> f = cache.get(arg);
			if(f == null) {
				Callable<V> eval = new Callable<V>() {
					public V call() throws InterruptedException {
						return c.compute(arg);
					}
				};
				FutureTask<V> ft = new FutureTask<V>(eval);
				f = cache.putIfAbsent(arg, ft);
				if(f == null) {
					f = ft;
					ft.run();
				}
			}
			
			try {
				return f.get();
			} catch(CancellationException e) {
				cache.remove(arg, f);
			} catch(ExecutionException e) {
				throw launderThrowable(e.getCause());
			}
		}
	}
}

免责声明:文章转载自《FutureTask详解》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Google Map 根据坐标 获取地址信息Jquery中用offset().top和offsetTop的比较下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java笔记(十五) 并发包

并发包Java中还有一套并发工具包,位于包java.util.concurrent下,里面包括很多易用 且很多高性能的并发开发工具。 一、原子变量和CAS 为什么需要原子变量,因为对于例如count++这种操作,使用 synchronized成本太高了。Java并发包的基本原子变量有: AtomicBoolean、AtomicInteger、AtomitL...

Asp.Net Core 缓存的使用(译)

原文:http://www.binaryintellect.net/articles/a7d9edfd-1f86-45f8-a668-64cc86d8e248.aspx环境:Visual Studio 2017, Asp.Net Core 1.1. 缓存机制主要是为了提高性能。在ASP.NET Web Forms 及 ASP.NET MVC中可以直接使用缓...

VTK 图形基本操作进阶_点云配准技术(迭代最近点ICP算法)

1.Iterative Closest Points算法 点云数据配准最经典的方法是迭代最近点算法(Iterative Closest Points,ICP)。ICP算法是一个迭代的过程,每次迭代中对于源数据点P找到目标点集Q中的最近点,然后给予最小二乘原理求解当前的变换矩阵T。通过不断迭代迭代直至收敛,即完成了点集的配准。 1.1 基本原理 ICP算法...

SmartSql漫谈

最近在看smartSql源码,兄弟写的。写的很不错取取经。 记录下一些学习的东西,刚开始我先不系统的写了,随意一点哈,我看的差不多再给大家一个模块一个模块系统的写。 public T ExecuteScalar<T>(RequestContext context) { T result =...

jstat命令使用

jstat命令使用 jstat是JDK自带的一个轻量级小工具,全称“Java Virtual Machine statistics monitoring tool”,它位于java的bin目录下,主要利用JVM内建的指令对Java应用程序的资源和性能进行实时的命令行的监控,包括了对Heap size和垃圾回收状况的监控。 命令格式 ➜ ~ jstat -...

Jsp标签字典开发_基于Spring+Hibernate

目录 1. Jsp标签字典开发_基于Spring+Hibernate   1.1. 简述   1.2. 定义DictItem实体   1.3. 定义字典的@interface   1.4. 定义字典缓存类   1.5. 定义tld标签   1.6. 持久层实体使用注解   1.7. 页面调用jsp标签 2. 补充点   2.1. Hibernate设置属性...