多线程中如何取消任务

摘要:
尽管线程取消和中断之间没有必然的联系,但在实践中发现,中断是实现取消的最合理的方式。收到中断通知后,线程通常会在下一个适当的时间中断自己。最合理的中断策略是某种形式的线程级取消或服务级取消:尽快退出,必要时进行清理,并通知所有者线程线程已退出。中断请求可以有一个或多个收件人。例如,中断线程池中的工作线程意味着取消当前任务并关闭工作线程。

大多数情况下,任务运行完后会自动结束。然而,有时我们希望提前结束任务或线程,可能是因为用户取消了操作,或者应用程序需要被快速关闭。但是,Java并没有提供任务机制来安全地终止线程。虽然如此,但Java提供了线程中断,中断是一种协作机制,能使一个线程终止另一个线程的当前工作。

我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享数据处于不一致的状态,而使用协作机制的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。

一、任务取消

1.使用volatile状态变量来控制

操作被取消的原因有很多,比如超时,异常,请求被取消等等。

一个可取消的任务要求必须设置取消策略,即如何取消,何时检查取消命令,以及接收到取消命令之后如何处理。

最简单的取消办法就是利用取消标志位,如下所示。这段代码用于生成素数,并在任务运行一秒钟之后终止。其取消策略为:通过改变取消标志位取消任务,任务在每次生成下一随机素数之前检查任务是否被取消,被取消后任务将退出。

@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this") private final List<BigInteger> primes = new ArrayList<BigInteger>();
    //必须是volatile
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        //检查状态,从而取消任务
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

然而,该机制最大的问题就是无法应用于阻塞方法,例如BlockingQueue.put()。可能会产生一个很严重的问题——任务可能因阻塞而永远无法检查取消标识,导致任务永远不会结束。

class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled)
                //这里可能产生阻塞,从而无法取消任务。
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
        }
    }

    public void cancel() {
        cancelled = true;
    }
}

2.中断 

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能情况下停止当前工作。

虽然线程的取消和中断没有必然联系,但是在实践中发现:中断是实现取消的最合理方式。

对中断操作的正确理解是:它并不会真正的中断线程,而是给线程发出中断通知,告知目标线程有人希望你退出。目标线程收到通知后如何处理完全由目标线程自行决定,这是非常重要的。线程收到中断通知后,通常会在下一个合适的时刻(被称为取消点)中断自己。有些方法,如wait、sleep和join等将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。

对于前面BrokenPrimeProducer的问题很容易解决(和简化),使用中断而不是boolean标识来请求取消。

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            //检查中断
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* 允许线程退出 */
        }
    }

    public void cancel() {
        interrupt();
    }
}

代码中有两次检查中断请求:

①第一次是在循环开始前,显示检查中断请求;

②第二次是在put方法,该方法为阻塞的,会隐式的检测当前线程是否被中断;

2.1中断策略

正如任务中应该包含取消策略一样,线程同样也需要有中断策略:发现中断请求时应该做什么(如果需要),以多快速度来响应中断(立即响应还是推迟响应)。

最合理的中断策略是某种形式的线程级别的取消操作或服务级别的取消操作:尽快退出,在必要时进行清理,通知某个所有者线程该线程已经退出。

此外,还可以建立其它的中断策略,例如暂停服务或重新开始服务,但对于哪些包含非标准中断策略的线程或者线程池,这些中断策略只能用于知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者,例如,中断线程池中的某个工作者线程, 则意味着取消了当前任务,同时也意味着关闭了工作者线程。

任务不会在其自己拥有的线程中执行,而是在某个服务(如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使非所有者代码也可以做出响应。(当为一户人家打扫房屋时,即使主人不在,也不应该把这段时间内收到的邮件扔掉,而是应该收起来等主人回来后交给他们处理,尽管你可以阅读它们的杂志)。

这就是为什么大多数可阻塞的库方法都是抛出中断异常(InterruptedException)作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

当检测到中断请求时,任务并不需要立刻放弃所有的操作,它可以推迟处理中断请求到某个更适合的时刻。既然要推迟处理,就需要记住中断请求,并在完成当前任务后抛出中断异常,或者表示已收到中断请求。

无论任务把中断视为取消,还是其它某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将中断异常传递给调用者外,还需要进行其它操作,则因该在捕获中断异常之后回复中断状态。

Thread.currentThread().interrupt();

切记,只有实现了线程中断策略的代码才能屏蔽中断请求,在常规的任务和库代码中都不应该屏蔽中断请求。

虽然有人质疑Java没有提供抢占式的中断机制,但是开发人员通过处理中断异常的方法,可以定制更为灵活的中断策略,从而在响应性和健壮性之间做出合理的平衡。

2.2响应中断

当调用可中断的阻塞函数时,例如Thread.sleep或者BlockingQueue.put等,有两种实用策略可用于响应中断

  • 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理

①传递异常

将InterrupedException传递给调用者:

    BlockingQueue<Task> queue;
    ……
    //抛出InterruptedException,从而将中断传递给调用者
    public Task getNextTask() throws InterruptedException{
        return queue.take();
    }

②恢复中断状态

如果不想或无法传递InterruptedException(或者通过Runnable来定义任务),那么需要寻找另外的方式来保存中断请求。一种标准的方法就是通过再次调用interrupt来恢复中断状态。你不能屏蔽InterruptedException,例如在catch块中捕获到异常却不做任务处理,除非在你的代码中实现了线程的中断策略。虽然PrimeProducer屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获InterruptedException时恢复状态。如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出InterruptedException。(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地相应中断)

不可取消的任务在退出前恢复中断:

public TaskgetNextTask(BlockingQueue<Task>queue){
   booleaninterrupted=false;
   try{
      while(true){
          try{
              return queue.take();
          }
          catch (InterruptedException e){
             interrupted=true;
             // 重新尝试
          }
      }
   }
   finaly{
      if (interrupted) 
          Thread.currentThread().interrupt();
   }
}

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

3.通过Future来实现取消

在使用Future之前,我们还是先来看个计时运行的示例,来一步步引出Future。

3.1在外部线程中安排中断(不要这么做)

在下面的程序中,给出了在指定时间内运行一个任意的Runnable 的示例。它在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为该异常会被 timedRun 的调用者捕获。

/**
 * 在外部线程中安排中断(不要这么做)
 */
public class TimedRun1 {
    private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

    public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
        final Thread taskThread = Thread.currentThread();
        cancelExec.schedule(new Runnable() {//①安排一个取消任务,希望在运行指定的时间间隔后中断②处的任务。
            public void run() {
                taskThread.interrupt();
            }
        }, timeout, unit);
        r.run();//②运行任务
    }
}

这是一种非常简单的方法,但却破坏了以下规则:在中断线程之前,应该了解它的中断策略。

由于timedRun可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。如果任务在超时之前完成,那么中断timedRun所在线程的取消任务将在 timedRun 返回到调用者之后启动。我们不知道在这种情况下将运行什么代码,但结果一定是不好的。(可以使用schedule返回的ScheduleFuture来取消这个取消任务以避免这种风险,这种做法虽然可行,但是非常复杂。)

另外,如果任务不响应中断,那么timedRun会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限),而限时运行的服务没有在指定时间内返回,对调用者来说可能会带来负面硬性。 

3.2在专门的线程中中断任务(可解决问题,但也有缺陷)

在下面的程序中解决了最开始的异常处理问题以及上面解决方案中的问题。

执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。在启动任务线程之后,timedRun 将执行一个限时的 join 方法。在join返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用timedRun 的线程中再次抛出该异常。

/**
 * 在专门的线程中中断任务
 */
public class TimedRun2 {
    private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);

    public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        //方法内部类
        class RethrowableTask implements Runnable {
            //由于Throwable将在两个线程之间共享,因此声明为volatile类型,从而确保安全地将其从任务线程发布到timedRun线程。
            private volatile Throwable t;

            public void run() {
                try {
                    r.run();
                } catch (Throwable t) {
                    this.t = t;
                }
            }

            void rethrow() {
                if (t != null)
                    throw launderThrowable(t);
            }
        }

        RethrowableTask task = new RethrowableTask();
        final Thread taskThread = new Thread(task);
        taskThread.start();//①启动任务
        cancelExec.schedule(new Runnable() {//
            public void run() {
                taskThread.interrupt();
            }
        }, timeout, unit);
        taskThread.join(unit.toMillis(timeout));//
        task.rethrow();//④join返回后,它将检查任务中是否有异常抛出,如果有,则会在调用timedRun的线程中再次抛出该异常。
    }
}

这里解决了前面示例中的问题,但由于它依赖一个限时的 join,因此存在着join的不足:无法知道执行控制是因为线程正常退出而返回还是因为join超时而返回。

3.3 通过Future来实现取消

Future是JDK库中的类,可用来管理任务的生命周期,处理异常,也可以实现取消。通常,使用现有库中的类比自行实现更好,所以我们可以直接使用Future来实现任务的取消。

看下面的实现:将任务提交给一个ExecutorSevice,并通过一个定时的Future.get来获取结果。如果get返回时抛出了TimeoutException,那么任务将通过它的Future来取消。(为了简化,这里在finally中直接调用Future.cancel,因为取消一个已完成的任务不会带来任何影响)。如果任务在被取消前就被抛出一个异常,那么该异常将被重新抛出以便由调用者来处理。

/**
 * 通过Future来取消任务
 */
public class TimedRun {
    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // 因超时而取消任务
        } catch (ExecutionException e) {
            // 任务异常,重新抛出异常信息
            throw launderThrowable(e.getCause());
        } finally {
            // 如果该任务已经完成,将没有影响
            // 如果任务正在运行,将因为中断而被取消
            task.cancel(true); // interrupt if running
        }
    }
}

这里给出了一种良好的编程习惯:取消哪些不再需要结果的任务。

当Future.get抛出InterruptedException或TimeoutException时,如果不再需要结果,那么就可以使用Future.cancel来取消任务。这是一种良好的编程习惯。

4.处理不可中断的阻塞

参考: https://www.jianshu.com/p/613286f4245e

在java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。以下是不可中断阻塞的情况:

       1. java.io包中的同步Socket I/O。如套接字中进行读写操作read, write方法。

       2. java.io包中的同步I/O。如当中断或关闭正在InterruptibleChannel上等待的线程时,会对应抛出ClosedByInterruptException或AsynchronousCloseException。

       3. Selector的异步I/O。如果一个线程在调用Selector.select时阻塞了,那么调用close, wakeup会使线程抛出ClosedSelectorException。

       4. 获取某个锁。当一个线程等待某个锁而阻塞时,不会响应中断。但Lock类的lockInterruptibly允许在等待锁时响应中断。

在下面的示例中,ReaderThread给出了如何封装非标准的取消操作。ReaderThread 管理了一个套接字连接, 它采用同步方式从该套接字中读取数据, 并将接收到的数据传递给processBuffer。为了结束某个用户的连接或者关闭服务器, ReaderThread改写了interrupt方法,使其既能处理标准的中断, 也能关闭底层的套接字。因此, 无论ReaderThread线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞, 都可以被中断并停止执行当前的工作。
/**
 * 通过改写Interrupt方法将非标准的取消操作封装在Thread中。 
 */
public class ReaderThread extends Thread {
    private static final int BUFSZ = 512;
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    /**
     * ReaderThread改写了interrupt方法,使其既能处理标准的中断,也能关闭底层的socket
     */
    public void interrupt() {
        try {
            // 关闭socket。此时in.read会抛出异常
            socket.close();
        } catch (IOException ignored) {
        } finally {
            // 正常的中断
            super.interrupt();
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { 
            // 如果socket关闭,in.read方法将会抛出异常。借此机会,响应中断,线程退出
        }
    }

    public void processBuffer(byte[] buf, int count) {
    }
}

5.采用newTaskFor来封装非标准的取消

我们可以通过newTaskFor方法来进一步优化ReaderThread中封装非标准取消的技术, 这是Java 6 在ThreadPoolExecutor 中的新增功能。当把一个Callable 提交给ExecutorService 时,submit 方法会返回一个Future, 我们可以通过这个Future 来取消任务。newTaskFor是一个方法, 它将创建Future 来代表任务。newTaskFor 还能返回一个RunnableFuture 接口, 该接口扩展了Future 和Runnable (并由FutureTask 实现)。

       通过定制表示任务的Future 可以改变Future.cancel 的行为。例如, 定制的取消代码可以实现日志记录或者收集取消操作的统计信息, 以及取消一些不响应中断的操作。通过改写interrupt 方法, ReaderThread 可以取消基于套接字的线程。同样, 通过改写任务的Future.cancel 方法也可以实现类似的功能。

在下面的示例中,CancellableTask中定义了一个CancellableTask 接口, 该接口扩展了Callable,并增加了一个cancel 方法和一个newTask 工厂方法来构造RunnableFuture 。CancellingExecutor扩展了ThreadPoolExecutor, 并通过改写newTaskFor 使得CancellableTask 可以创建自己的Future 。
/**
 *  通过newTaskFor将非标准的取消操作封装在一个任务中
 */

/**
 * 扩展了Callable,并增加了一个cancel 方法和一个newTask工厂方法来构造RunnableFuture
 */
interface CancellableTask <T> extends Callable<T> {
    void cancel();

    RunnableFuture<T> newTask();
}

/**
 * 扩展了ThreadPoolExecutor,并通过改写newTaskFor使得CancellableTask可以创建自己的Future
 */
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}

/**
 * 实现了CancellableTask,并定义了Future.cancel来关闭socket和调用super.cancel。如果SocketUsingTask通过其自己的Future来取消,那么底层的socket将被关闭时确保响应取消操作,而且还能调用可阻塞的socket I/O方法
 */
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
    @GuardedBy("this") private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }

    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
} 

二、停止基于线程的服务

与其它封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程。用通俗易懂的话讲就是应用程序管理服务,服务管理工作者线程,而应用程序不能直接管理工作者线程,因此应用程序不能直接停止工作者线程,而是应该由服务提供生命周期方法来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。比如,在ExecutorService中就提供了shutdown和shutdownNow等方法。同样,在其它拥有线程的服务中也应该提供类似的关闭机制。

1.关闭ExecutorService

ExecutorService提供了两种关闭方法:

  • 使用shutdown正常关闭。
  • 使用shutdownNow强行关闭。

在进行强行关闭时, shutdownNow 首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。

这两种关闭方式的差别在于各自的安全性和响应性:

强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束;而正常关闭虽然速度慢,但却更安全,因为 ExecutorService会一直等到队列中的所有任务都执行完成后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。

简单的程序可以直接在main 函数中启动和关闭全局的 ExecutorService。而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务能提供自己的生命周期方法。例如下面程序清单中,它将管理线程的工作委托给一个ExecutorService,而不是由其自行管理。通过封装 ExecutorService,可以将所有权链从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

/**
 * 封装ExecutorService实现日志服务
 */
public class LogService {
    private final ExecutorService exec = Executors.newSingleThreadExecutor();
    private final PrintWriter writer;

    public LogService(PrintWriter writer) {
        this.writer = writer;
    }

    public void start(){

    }

    public void log(String msg) {
        try {
            exec.execute(new WriteTask(msg));
        } catch (RejectedExecutionException ignored) {
        }
    }

    public void stop(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            exec.shutdown();
            // 关闭服务后, 阻塞到所有任务被执行完毕或者超时发生,或当前线程被中断
            exec.awaitTermination(timeout, unit);
        } finally {
            writer.close();
        }
    }
}

2."毒丸"对象

“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。”在FIFO 队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会在提交任何工作。在下面的程序清单中给出了一个单生产者——单消费者的桌面搜索示例,使用了“毒丸”对象来关闭服务。

/**
 *  通过“毒丸”对象来关闭服务
 */
public class IndexingService {
    private static final int CAPACITY = 1000;
    //毒丸
    private static final File POISON = new File("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File> queue;
    //private final FileFilter fileFilter;
    private final File root;

    public IndexingService(File root) {
        this.root = root;
        this.queue = new LinkedBlockingQueue<File>(CAPACITY);
        
    }

    private boolean alreadyIndexed(File f) {
        return false;
    }

    // IndexingService的生产者线程
    class CrawlerThread extends Thread {
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) { /* 发生异常 */
            } finally {
                while (true) {
                    try {
                        System.out.println("放入“毒丸”");
                        queue.put(POISON);//放入毒丸
                        break;
                    } catch (InterruptedException e1) { /* 重试 */
                    }
                }
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles();
            if (entries != null) {
                for (File entry : entries) {
                    if (entry.isDirectory())
                        crawl(entry);
                    else if (!alreadyIndexed(entry)){
                        System.out.println("放入生产者队列文件:"+entry.getName()+" 来自线程:"+Thread.currentThread().getName());
                        queue.put(entry);
                    }
                }
            }
        }
    }

    // IndexingService的消费者线程
    class IndexerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    File file = queue.take();
                    // 遇到毒丸,终止
                    if (file == POISON){
                        System.out.println("遇到“毒丸”,终止");
                        break;
                    }   
                    else
                        indexFile(file);
                }
            } catch (InterruptedException consumed) {
            }
        }

        public void indexFile(File file) {
            System.out.println("消费者取出文件:"+file.getName()+" 来自线程:"+Thread.currentThread().getName());
            /* ... */
        };
    }

    public void start() {
        producer.start();
        consumer.start();
    }

    public void stop() {
        producer.interrupt();
    }

    public void awaitTermination() throws InterruptedException {
        consumer.join();
    }

}

只有在生产者和消费者的数量都已知的情况下,才可以使用”毒丸“对象。

这种解决方案可以扩展到多个生产者的情况只需每个生产者都想队列放入一个”毒丸“对象,并且消费者仅当在接收到N个生产者的”毒丸“对象时才停止。

这种解决方案也可以扩展到多个消费者的情况:只需生产者将N个”毒丸“对象放入队列。

然而,当生产者和消费者的数量较大时,这种方法将变的难以使用。只有在无界队列中,”毒丸“对象才能可靠地工作。

三、处理非正常的线程终止

导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其它不可修复的错误,因此它们通常不会被捕获。它们不会再调用栈中逐层传递,而是默认在控制台中输出栈追踪信息,并终止线程。

线程非正常退出的后果是否严重要取决于线程的作用。比如,线程池中丢失一个线程可能会对性能带来一定影响,但如果程序能在包含了50个线程的线程池上运行良好,那么在包含了49个线程的线程池上通常也能运行良好。然而,对于GUI程序,如果丢失了事件分配线程,那么造成的影响会非常显著——应用程序将停止处理事件并且GUI会因此失去响应。

如果run()方法中的代码抛出NullPointerException而失败,可以将代码放在try-catch(或try-finally)代码块中,这样就能捕获未检查的异常了。

未捕获异常的处理

上面介绍了是一种主动方法来解决未检查异常。在Thread中同样也提供了UncaughtExceptionHandler,它能检测出某个由于未捕获的异常而终结的情况。这两种方法是互补的,通过将二者结合在一起,就能有效地防止线程泄露问题。

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给UncaughtExceptionHandler异常处理器,如果没有提供异常处理器,那么默认的行为是将追踪信息输出到Sytem.err,即控制台。

异常处理器该如何处理未捕获的异常呢?最常见的方式是将错误信息及相应的栈追踪信息写入日志中,如下所示。异常处理器还可以采取更直接的响应,例如尝试重启线程,关闭应用程序,或执行其它修复或诊断等操作。

/**
 * 将异常写入日志的UncaughtExceptionHandler
 */
public class UEHLogger implements Thread.UncaughtExceptionHandler {

    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }

}

则在使用时只需要为线程设置一个UncaughtExceptionHandler即可。

Thread thread=new Thread();
thread.setUncaughtExceptionHandler(new UEHLogger()); 
thread.start();

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器异常至少会将异常信息记录到日志中。

要为线程池中的所有线程设置一个UncaughtExceptionHandler,只需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory。这部分可以参考另一篇博客:多线程异常处理

面试题:

1.取消任务的几种方式?

①使用volatile状态变量来控制

然而,该方式最大的问题就是无法应用于阻塞方法,例如BlockingQueue.put()。可能会产生一个很严重的问题——任务可能因阻塞而永远无法检查取消标识,导致任务永远不会结束。

②中断

中断是一种两阶段终止模式,

……

免责声明:文章转载自《多线程中如何取消任务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇[转载]注册机破解法的原理以及应对方法web自动化测试第11步:切换窗口、frame、alert的新方法:switch_to包详解下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MQTT协议的简单介绍和服务器的安装

转:http://blog.csdn.net/djun100/article/details/25752491   最近公司做的项目中有用到消息推送,经过多方面的筛选之后确定了使用MQTT协议,相对于XMPP,MQTT更加轻量级,并且占用用户很少的带宽。 MQTT是IBM推出的一种针对移动终端设备的基于TCP/IP的发布/预订协议,可以连接大量的远程传感器...

多线程中,ResultSet为空,报错空指针

最近在数据库查询数据时,由于数据量太大,使用了多线程,通过线程池建了好几个线程,然后调用了一个封装好的jdbc查询语句。 结果在多线程中,ResultSet报错空指针。 仔细查阅后,才发现多个线程访问了同一个connection,事务混乱,导致了空指针。 解决方法: 使用数据库连接池,这样一个线程各自使用一个connection,就不会有冲突了。...

log4cplus使用(三)-日志重定向

本文讲述的是log4cplus日志输出到qt widget,封装了serverSocket。     log4cplus支持用户自定义输出设备,只需要继承自Appender,或者Appender子类,并实现append成员方法,然后在 log4cplus初始化成功之后,把自定义输出设备添加到logger中,当用户向logger中输出信息时,logger会遍...

Spring Cloud 生产环境性能优化

先思考几个问题: 什么是百万并发连接? 什么是吞吐量? 操作系统能否支持百万连接? 操作系统维持百万连接需要多少内存? 应用程序维持百万连接需要多少内存? 百万连接的吞吐量是否超过了网络限制? 百万的并发连接挑战意味着什么: 100 万的并发连接数 10 万个连接/秒——(如果每个连接以这个速率持续约10秒) 1 GB/秒的连接——快速连接到互联网。...

HTML5触屏版多线程渲染模板技术分享

  前言: 了解js编译原理的屌丝们都知道,js是单线程的,想当年各路神仙为了实现js的多线程,为了解决innerHTML输出大段HTML卡页面的顽疾,纷纷设计了诸如假冒的“多线程“实现,我自己也在写开源框架KitJs时候,写过类似的组件http://www.cnblogs.com/xueduanyang/archive/2012/05/30/252642...

Java多线程基础:进程和线程之由来

  在前面,已经介绍了Java的基础知识,现在我们来讨论一点稍微难一点的问题:Java并发编程。当然,Java并发编程涉及到很多方面的内容,不是一朝一夕就能够融会贯通使用的,需要在实践中不断积累。由于并发肯定涉及到多线程,因此在进入并发编程主题之前,我们先来了解一下进程和线程的由来,这对后面对并发编程的理解将会有很大的帮助。   下面是本文的目录大纲:  ...