2012-07-25 26 views
2

我有一个任务队列和一个线程,在几秒钟内窥探一次队列,并且它有一个任务执行它。等待未知数量的异步任务

我有另一个代码段(当然是另一个线程),它在循环中创建任务(我无法预先从循环外部知道任务数量)并将它们插入到队列中。任务包含一些“结果”对象,外部线程(创建这些任务)需要等待所有任务完成并最终从每个任务中获取结果。 问题是我无法将java Semaphore \ CountDownLatch等传递给结果对象,因为我不知道提前显示器的数量。 我也不能使用使用invokeAll的Executor,或者等待Future对象,因为任务是不同步的(外部线程只是将任务加入到队列中,而另一个线程在他有空时执行任务)。

我唯一想到的解决方案是创建一些“反转信号量”类,它包含一组结果和一个监视器计数器。该功能的getResult将检查计数器== 0,如果答案是肯定就会通知一些锁定的对象,以及功能的getResult会等待这个锁:

public class InvertedSemaphore<T> { 
    Set<T> resultSet; 
    int usages; 
    final Object c; 

    public InvertedSemaphore() { 
     resultSet = Collections.synchronizedSet(new HashSet<T>()); 
     usages = 0; 
     c = new Object(); 
    } 

    public void addResult(T result) { 
     resultSet.add(result); 
    } 

    public void addResults(Set<T> result) { 
     resultSet.addAll(result); 
    } 


    public void acquire() { 
     usages++; 
    } 

    public void release() { 
     synchronized (c) { 
      if (--usages == 0) { 
       c.notify(); 
      } 
     } 
    } 

    public Set<T> getResults() { 
     synchronized (c) { 
      try { 
       while (usages > 0) { 
        c.wait(); 
       } 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     return resultSet; 
    } 

} 

每个addTask方法调用semaphore.acquire,和每个(未同步)任务将在任务结束时调用semaphore.release。

这听起来很复杂,我很确定在java并发库中有更好的解决方案。

任何想法将appriciated :)

+0

听起来像你需要'CountUpLatch'。:) – corsiKa 2012-07-25 17:17:53

回答

3

如果任务不需要为了进行处理,使用ExecutorCompletionService

更一般地说,没有必要为了上ExecutorService使用invokeAll得到Future的结果。 ExecutorService#submit可以用于此目的,或者可选地,创建的任务本身可以实现Future,从而允许任务的创建者在稍后的时间点询问结果。

一些代码:

class MyTask { 
    AtomicReference<?> result = new AtomicReference<?>(); 

    void run() { 
     //do stuff here 
     result.set(/* the result of the calculation */); 
    } 

    boolean resultReady() { 
     return result.get()!=null; 
    } 

    ? get() { 
     return result.get(); 
    } 
} 

...别处代码

void createTasks() { 
    Collection<MyTask> c = new ...; 

    while(indeterminable condition) { 
     MyTask task = new MyTask(); 
     c.add(task); 
     mysteryQueue.add(task); 
    } 

    while(haven't received all results) { 
     MyTask task = c.get(...); //or iterate or whatever 
     ? result = task.get(); 
     if (result!=null) { 
      //do stuff, probably remove the task from the collection c would be smart 
     } 
    } 
} 
+0

但是执行程序的问题在于执行程序将执行的'任务'只是将实际任务放入某个队列中 - 并且这些任务将以不同的方式以不同的方式执行。我需要等待'真正'的任务结果,所以如果结果已经存在,我确实可以检查一次,但我甚至不知道是否会有结果。我必须知道何时完成所有异步任务,然后才能获得结果。 – axelrod 2012-07-25 17:27:39

+0

那真是愚蠢。但是它并不妨碍你自己实现'Future'或者通过“真实任务”的界面提供某种方式来获得结果 – 2012-07-25 17:38:42

1

一个想法是使用一个单独的队列结果。
所以你将有一个阻塞队列,线程A为线程B放置任务,从而具有生产者 - 消费者的方法,并且当每个任务完成时,结果可以被放置在的第二个结果队列中,将消费者 - 生产者角色因为现在线程A最初创建的任务将消耗第二个队列的结果。

+0

是的,但是我需要对结果进行一些操作,我不会知道当结果队列将被视为“完整”时。 – axelrod 2012-07-25 17:40:52

+0

不确定你的意思。线程'A'知道它已经在线程'B'的队列中插入了多少个任务。现在只要在结果队列中有一个任务消耗它,就会线程'A'。如果没有任务并且还没有消耗完所有结果(它知道它放在第一个队列上多少,所以'A'知道应该消耗多少),它要么等待结果要么进行其他处理 – Cratylus 2012-07-25 17:57:38

+0

@axelrod,是的,你可以,就像你在反向信号灯的想法中检查柜台一样。有些东西会在那里放置一个初始值并且东西会递减 - 这意味着您知道如何确定您可以根据完成一定数量的任务继续执行其他一些执行。 – Vitaliy 2012-07-25 17:58:47

1

您可以执行以下操作: 每个生产者都将拥有自己的队列。生产者将通过一种手段将此队列报告给任务本身。当任务完成运行时,它将把结果排队到这个队列中。它是由一些代码描述的兽:

class Result{} 

interface IResultCallback{ 
    void resultReady(Result r); // this is an abstraction of the queue 
} 

class Producer implements IResultCallback{ 
    // the producer needs to pass itself to the constructor of the task, 
    // the task will only see its "resultReady" facade and will be able to report to it. 
    // the producer can aggragte the results at it will and execute its own computation as 
     // as soon it is ready 

    Queue<Result> results; // = init queue 

    @Override 
    public void resultReady(Result r) { 
     results.add(r); 

     if(results.size() == 9){ 
      operate(); 
     } 
     results.clear(); 
    } 

    public void operate(){ 
     // bla bla 
    } 
} 

public class Task { 
    IResultCallback callback; 

    public Task(IResultCallback callback){ 
     this.callback = callback; 
    } 
    public void execute(){ 
     // bla bla 

     Result r = null; // init result; 
     callback.resultReady(r); 
    } 
} 
+0

我会试试这个方法,thx :) – axelrod 2012-07-25 18:41:09

+0

@axelrod祝你好运!别忘了成为StackOverflow的优秀代表,并投票/接受答案:-) – Vitaliy 2012-07-25 18:49:08

+0

@axelrod欢迎登陆!它*是一个很好的问题! – Vitaliy 2012-07-25 18:55:49