2017-05-11 93 views
0

我正在构建一个方法,它需要x大小的方法序列并返回第一个方法的结果完成。斯卡拉 - 多线程,当任何子线程完成时完成主线程

def invokeAny(work: Seq[() => Int]): Int = ??? 

如何通过使用线程来完成此操作? (不允许期货)

这是我已经能够提出的最好的,但似乎不适用于所有情况。

def invokeAny(work: Seq[() => Int]): Int = { 
    @volatile var result = 0 // set to return value of any work function 
    val main = Thread.currentThread() 

    val threads: Seq[Thread] = work.map(work => new Thread(new Runnable { 
     def run { result = work(); main.interrupt(); }})) 

    threads.foreach(_.start()) 
    for(thread <- threads) { 
     try { 
     thread.join() 
     } catch { 
     // We've been interrupted: finish 
     case e: InterruptedException => return result 
    } 
    } 
    return result 
    } 
+0

我会建议,而不是使用线程 –

+0

啊,我知道期货,但我想知道如何使用线程 – Leero11

+0

使用值为1的'java.util.concurrent.CountDownLatch'。你的子线程会调用'latch.countDown()',主线程将使用'latch.await()' –

回答

0

不是pretiest答案,但似乎工作:

def invokeAny(work: Seq[() => Int]): Int = { 
    @volatile var result = 0 // set to return value of any work function 
    val main = Thread.currentThread() 

    var threads: Seq[Thread] = Seq() 

    //Interrupts all threads after one is interrupted 
    def interruptAll = { 
     main.interrupt() 
     for(thread <- threads) { 
     thread.interrupt() 

     } 
    } 

    threads = work.map(work => new Thread( 
     new Runnable { 
      def run { 
      result = try { 
       work() } catch { 
       case e:InterruptedException => return 
      } 
      interruptAll; 

      } 
      })) 

    threads.foreach(_.start()) 
    for(thread <- threads) { 
     try { 
     thread.join() 
     } catch { 
     // We've been interrupted: finish 
     case e: InterruptedException => return result 
    } 
    } 
    return result 
    } 
0

使用的BlockingQueue,没有共享的可变状态,工作线程写入队列,主线程等待,直到他们完成和读取那么队列做的结果类似总和

def invokeAny1(work: Seq[() => Int]): Int = { 
    val queue = new ArrayBlockingQueue[Int](work.size) 

    val threads: Seq[Thread] = work.map(w => new Thread(new Runnable { 
     def run { 
     val result= w() 
     queue.put(result) }})) 

    threads.foreach(_.start()) 
    threads.foreach(_.join()) 

    var sum:Int=0 

    while(!queue.isEmpty) { 
     sum +=queue.take() 
    } 
     sum 
} 

使用的CountDownLatch。 工作线程增加一个原子变量。 当所有线程都完成的锁被释放,主线程可以从原子变量读取数据

def invokeAny2(work: Seq[() => Int]): Int = { 

    val total=new AtomicInteger 
    val latch= new CountDownLatch(work.size) 

    val threads: Seq[Thread] = work.map(w => new Thread(new Runnable { 
     def run { 

     val result= w() 
     total.getAndAdd(result) 
     latch.countDown  
     }})) 

    threads.foreach(_.start()) 

    latch.await //wait till the latch is released 
    total.get 
    } 

} 
+0

等待它我测试了你的代码,但它似乎给出了错误的答案,并且比我发布的代码长四倍:// – Leero11

+1

我的代码与你发布的代码有点不同,它会等到所有代码都完成。如果你只想要第一个线程的结果完成,那么在第二个代码段中使val latch = new CountDownLatch(1) –

+0

好酷;)thx参与 – Leero11