我想取消从ThreadPoolExecutor获得的FutureTask,但我想确保线程池上的Callable已停止它的工作。等待FutureTask上的取消()
如果我调用FutureTask#cancel(false)然后get()(阻塞直到完成),我得到一个CancelledException。这个异常是立即抛出还是在任务停止执行之后抛出?
我想取消从ThreadPoolExecutor获得的FutureTask,但我想确保线程池上的Callable已停止它的工作。等待FutureTask上的取消()
如果我调用FutureTask#cancel(false)然后get()(阻塞直到完成),我得到一个CancelledException。这个异常是立即抛出还是在任务停止执行之后抛出?
这个答案通过检查修复了阿列克谢的竞争条件和FooJBar代码该任务已被取消。 (在FutureTask.run检查状态并运行可调用期间,在取消和getWithJoin期间都可以成功完成调用,但仍然可以调用)但是,可调用仍将运行。)
我也决定不覆盖原始的取消,因为新的取消需要申报InterruptedException
。新的取消摆脱其无用的返回值(因为true
可能意味着“任务尚未开始”,“任务已经开始并且已经完成其大部分损害”,“任务已经开始并最终完成”)中的任何一个。 super.cancel
的返回值的检查也已经过去了,所以如果新的cancel被多次从不同的线程调用,它们都将等待任务完成。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask
*
* @author Aleksandr Dubinsky
*/
public class FixedFutureTask<T> extends FutureTask<T> {
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable},
* and arrange that {@code get} will return the given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion.
* If you don't need a particular result, consider using constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public
FixedFutureTask (Runnable runnable, T result) {
this (Executors.callable (runnable, result));
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public
FixedFutureTask (Callable<T> callable) {
this (new MyCallable (callable));
}
/** Some ugly code to work around the compiler's limitations on constructors */
private
FixedFutureTask (MyCallable<T> myCallable) {
super (myCallable);
myCallable.task = this;
}
private final Semaphore semaphore = new Semaphore(1);
private static class MyCallable<T> implements Callable<T>
{
MyCallable (Callable<T> callable) {
this.callable = callable;
}
final Callable<T> callable;
FixedFutureTask<T> task;
@Override public T
call() throws Exception {
task.semaphore.acquire();
try
{
if (task.isCancelled())
return null;
return callable.call();
}
finally
{
task.semaphore.release();
}
}
}
/**
* Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
@Override public T
get() throws InterruptedException, ExecutionException, CancellationException {
try
{
return super.get();
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws CancellationException
* @throws TimeoutException if the wait timed out
*/
@Override public T
get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {
try
{
return super.get (timeout, unit);
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Attempts to cancel execution of this task and waits for the task to complete if it has been started.
* If the task has not started when {@code cancelWithJoin} is called, this task should never run.
* If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted;
* otherwise, in-progress tasks are allowed to complete
* @throws InterruptedException if the thread is interrupted
*/
public void
cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {
super.cancel (mayInterruptIfRunning);
semaphore.acquire();
semaphore.release();
}
}
当然,'get(long timeout,TimeUnit unit)'不再服从它的合约,因为'semaphore.acquire()'没有超时。应该使用不同的线程同步原语。但是,我现在不会改变它。 – 2015-05-09 01:07:02
它一被取消就抛出。
有没有简单的方法知道它已经开始并完成。你可以创建一个可运行的包装来检查它的状态。
final AtomicInteger state = new AtomicInteger();
// in the runnable
state.incrementAndGet();
try {
// do work
} finally {
state.decrementAdnGet();
}
“有没有简单的方法知道它已经开始并完成” - 在这种情况下,您将从cancel()方法中获取false。我认为cancel()方法的结果和get()方法的结果的组合应该会给你提供完整的信息,但我可能是错的。 – 2011-05-18 07:08:14
如果取消正在运行的任务,则取消(true);应该返回true,get()会抛出CancellationException但是取消(true)正在运行的任务,它仍然可以运行。 – 2011-05-18 15:03:01
一个简单的标志并不回答这个问题,“它是否已经启动并完成,还是尚未启动?”它也不允许你等到任务完成(或者已经被取消而没有被启动的机会)。 – 2015-05-06 14:06:05
是的,CancellationException
立即抛出。您可以扩展FutureTask以添加get()
方法的版本,等待Callable
的线程完成。
public class ThreadWaitingFutureTask<T> extends FutureTask<T> {
private final Semaphore semaphore;
public ThreadWaitingFutureTask(Callable<T> callable) {
this(callable, new Semaphore(1));
}
public T getWithJoin() throws InterruptedException, ExecutionException {
try {
return super.get();
}
catch (CancellationException e) {
semaphore.acquire();
semaphore.release();
throw e;
}
}
private ThreadWaitingFutureTask(final Callable<T> callable,
final Semaphore semaphore) {
super(new Callable<T>() {
public T call() throws Exception {
semaphore.acquire();
try {
return callable.call();
}
finally {
semaphore.release();
}
}
});
this.semaphore = semaphore;
}
}
该答案与@ FooJBar的竞争条件相同。 “FutureTask”之间有一个窗口。运行'检查'状态'并运行可调用,在此期间'cancel'和'getWithJoin'都可以成功完成。但是,可调用仍将运行。可调用应该自己检查任务是否没有被取消,但这很难做到。 – 2015-05-06 15:17:10
阿列克谢的例子效果很好。我写了一个变型与构造以一个Runnable(将返回null),并展示了如何直接阻断(加入)的取消():
public class FutureTaskCancelWaits<T> extends FutureTask<T> {
private final Semaphore semaphore;
public FutureTaskCancelWaits(Runnable runnable) {
this(Executors.callable(runnable, (T) null));
}
public FutureTaskCancelWaits(Callable<T> callable) {
this(callable, new Semaphore(1));
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// If the task was successfully cancelled, block here until call() returns
if (super.cancel(mayInterruptIfRunning)) {
try {
semaphore.acquire();
// All is well
return true;
} catch (InterruptedException e) {
// Interrupted while waiting...
} finally {
semaphore.release();
}
}
return false;
}
private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) {
super(new Callable<T>() {
public T call() throws Exception {
semaphore.acquire();
try {
return callable.call();
} finally {
semaphore.release();
}
}
});
this.semaphore = semaphore;
}
}
不幸的是,这个答案有一个致命的缺陷。正如你在FutureTask.run:262(Oracle JDK 8u40)中看到的那样,在状态被检查之后和发生'c.call()'之前,可能会发生对'cancel'的调用。由于信号尚未输入,“取消”将立即结束。但是,无论如何,'c.call()'都会运行。解决办法可能是在新的可调用函数中添加一个“isCancelled()”的检查。但是,编译器抱怨“在调用超类型构造函数之前无法引用它”。 – 2015-05-06 15:08:04
OTOH,如果线程已被“cancel”中断,但只有当'mayInterruptIfRunning'设置为true时,'semaphore.acquire'才会抛出'InterruptedException'。 – 2015-05-06 16:01:30
什么是用例?似乎取消在这里给你带来了很多好处 - 如果你不希望工作被打断,但你仍然想等待它完成,取消给你的东西是什么? – 2011-05-18 23:16:24
该任务正在处理共享状态,我想确保它在我开始一个工作在相同共享状态的新工作之前已经停止工作。 – 2011-05-20 04:23:36
但是,取消的值是多少?未来是否被许多客户共享,你想与他们沟通? – 2011-05-20 22:30:46