2011-11-03 56 views
1

让我们在应用程序中使用一个经典的Executor。许多应用程序使用此执行程序进行一些计算,每次计算都可以取消,因此我可以在执行程序上调用shutdown()shutdownNow()Java执行程序部分关闭

但我只想关闭执行程序中的部分任务。可悲的是,我不能访问Future对象,他们计算的实现

我想是这样的Executor包装,这是我能传递给计算和应支持(实际上是计算由演员框架jetlang支持)的私处由真正的执行者。类似这样的:

// main application executor 
Executor applicationExecutor = Executors.newCachedThreadPool(); 

// starting computation 
Executor computationExecutor = new ExecutorWrapper(applicationExecutor); 
Computation computation = new Computation(computationExecutor); 
computation.start(); 

// cancelling computation 
computation.cancel(); 
// shutting down only computation tasks 
computationExecutor.shutdown(); 

// applicationExecutor remains running and happy 

还是其他的想法?

回答

2

对于那些,谁想要良好的两端:有最终的解决方案,部分基于伊万Sopov的回答。幸运的是,jetlang仅用于运行其任务Executor接口(而不是ExecutorService),所以我创建了包装类,它支持仅由此包装器创建的停止任务。

static class StoppableExecutor implements Executor { 
    final ExecutorService executor; 
    final List<Future<?>> futures = Lists.newArrayList(); 
    boolean stopped; 

    public StoppableExecutor(ExecutorService executor) { 
     this.executor = executor; 
    } 

    void stop() { 
     this.stopped = true; 
     synchronized (futures) { 
      for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) { 
       Future<?> future = iterator.next(); 
       if (!future.isDone() && !future.isCancelled()) { 
        System.out.println(future.cancel(true)); 
       } 
      } 
      futures.clear(); 
     } 
    } 

    @Override 
    public void execute(Runnable command) { 
     if (!stopped) { 
      synchronized (futures) { 
       Future<?> newFuture = executor.submit(command); 
       for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) { 
        Future<?> future = iterator.next(); 
        if (future.isDone() || future.isCancelled()) 
         iterator.remove(); 
       } 
       futures.add(newFuture); 
      } 
     } 
    } 
} 

使用,这是非常简单的:

ExecutorService service = Executors.newFixedThreadPool(5); 
StoppableExecutor executor = new StoppableExecutor(service); 

// doing some actor stuff with executor instance 
PoolFiberFactory factory = new PoolFiberFactory(executor); 

// stopping tasks only created on executor instance 
// executor service is happily running other tasks 
executor.stop(); 

这就是全部。工作很好。

0

如果将Computation设置为Runnable(并使用提供的Executor运行),直到设置布尔标志为止?沿线的东西:

public class Computation 
{ 
    boolean volatile stopped; 

    public void run(){ 
    while(!stopped){ 
    //do magic 
    } 

    public void cancel)(){stopped=true;} 
} 

你在做什么基本上是停止线程。但是,它不会被垃圾回收,而是被重用,因为它由Executor管理。查找“停止线程的正确方法是什么?”。

编辑:请注意上面的代码是相当原始的,因为它假定while循环的主体需要很短的时间。如果没有,检查将很少执行,您会注意到取消任务和实际停止之间的延迟。

+0

谢谢。一个计算由许多任务组成。这些任务是框架的私有部分,我只能编写从这些任务调用的回调函数。回调是相对较长的时间过程。我没有Runnable对象,它被提交给Executor。是的,我想停止属于当前计算的线程。 – mschayna

+0

正确。要立即停止它们(几乎),你需要中断。正确地做到这一点有点棘手,尤其是当任务在一部分池中运行时的线程。我无法比Brian Goetz在实践中的Java并发性更好地解释它,第7章。 – mbatchkarov

+0

我知道你的解决方案是停止线程的标准。我想要的就像“银弹”:-),当我无法注入计算代码时。 – mschayna

0

这样的事情? 你可能会做部分关闭:

for (Future<?> future : %ExecutorServiceWrapperInstance%.getFutures()) { 
    if (%CONDITION%) { 
     future.cancel(true); 
    } 
} 

下面是代码:

package com.sopovs.moradanen; 

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

public class ExecutorServiceWrapper implements ExecutorService { 

private final ExecutorService realService; 
private List<Future<?>> futures = new ArrayList<Future<?>>(); 

public ExecutorServiceWrapper(ExecutorService realService) { 
    this.realService = realService; 
} 

@Override 
public void execute(Runnable command) { 
    realService.execute(command); 
} 

@Override 
public void shutdown() { 
    realService.shutdown(); 

} 

@Override 
public List<Runnable> shutdownNow() { 
    return realService.shutdownNow(); 
} 

@Override 
public boolean isShutdown() { 
    return realService.isShutdown(); 
} 

@Override 
public boolean isTerminated() { 
    return realService.isTerminated(); 
} 

@Override 
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 
    return realService.awaitTermination(timeout, unit); 
} 

@Override 
public <T> Future<T> submit(Callable<T> task) { 
    Future<T> future = realService.submit(task); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

public synchronized List<Future<?>> getFutures() { 
    return Collections.unmodifiableList(futures); 
} 

@Override 
public <T> Future<T> submit(Runnable task, T result) { 
    Future<T> future = realService.submit(task, result); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

@Override 
public Future<?> submit(Runnable task) { 
    Future<?> future = realService.submit(task); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

@Override 
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { 
    List<Future<T>> future = realService.invokeAll(tasks); 
    synchronized (this) { 
     futures.addAll(future); 
    } 
    return future; 
} 

@Override 
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
     throws InterruptedException { 
    List<Future<T>> future = realService.invokeAll(tasks, timeout, unit); 
    synchronized (this) { 
     futures.addAll(future); 
    } 
    return future; 
} 

@Override 
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { 
    //don't know what to do here. Maybe this method is not needed by the framework 
    //than just throw new NotImplementedException(); 
    return realService.invokeAny(tasks); 
} 

@Override 
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
     throws InterruptedException, ExecutionException, TimeoutException { 
    //don't know what to do here. Maybe this method is not needed by the framework 
    //than just throw new NotImplementedException(); 
    return realService.invokeAny(tasks, timeout, unit); 
} 
} 
+1

谢谢,最后我有我自己的解决方案,这很容易从你的答案中得到启发。 – mschayna