2014-04-29 63 views
1

我想做大量的外部服务调用,每个跟进了异常处理和有条件的进一步处理。我认为使用.onComplete内部扩展这个很好的示例(Asynchronous IO in Scala with futures)很容易,但似乎我并不了解有关范围和/或期货的内容。任何人都可以指出我正确的方向吗?异步处理使用列表的斯卡拉期货与onComplete异常处理

#!/bin/bash 
scala -feature $0 [email protected] 
exit 
!# 

import scala.concurrent.{future, blocking, Future, Await} 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 
import scala.util.{Success, Failure} 
import scala.language.postfixOps 

val keylist = List("key1", "key2") 

val myFuts: List[Future[String]] = keylist.map { 
    myid => future { 
    // this line simulates an external call which returns a future (retrieval from S3) 
    val myfut = future { Thread.sleep(1); "START " + myid} 

    var mystr = "This should have been overwritten" 
    myfut.onComplete { 
     case Failure(ex) => { 
     println (s"failed with error: $ex") 
     mystr = "FAILED" 
     } 
     case Success(myval) => { 
     mystr = s"SUCCESS $myid: $myval" 
     println (mystr) 
     } 
    } 
    mystr 
    } 
} 

val futset: Future[List[String]] = Future.sequence(myFuts) 
println (Await.result(futset, 10 seconds)) 
我的电脑(斯卡拉2.10.4)上

,这个打印:

SUCCESS key2: START key2 
SUCCESS key1: START key1 
List(This should have been overwritten, This should have been overwritten) 

我想(顺序不重要):

SUCCESS key2: START key2 
SUCCESS key1: START key1 
List(SUCCESS key2: START key2, SUCCESS key1: START key1) 

回答

3

我会避免使用onComplete,并试图用它做一个可变的变量副作用的逻辑。相反,我会映射未来并处理失败案例作为返回不同的值。这里有一个稍微修改的代码版本,在Future上使用map(通过理解),然后使用recover来处理故障情况。希望这是你正在寻找的:

val keylist = List("key1", "key2") 

val myFuts: List[Future[String]] = keylist.map {myid => 

    // this line simulates an external call which returns a future (retrieval from S3) 
    val myfut = future { Thread.sleep(1); "START " + myid} 
    val result = for (myval <- myfut) yield { 
    val res = s"SUCCESS $myid: $myval" 
    println(res) 
    res 
    } 
    result.recover{ 
    case ex => 
     println (s"failed with error: $ex") 
     "FAILED"   
    } 

} 

val futset: Future[List[String]] = Future.sequence(myFuts) 
println (Await.result(futset, 10 seconds)) 
+0

工程很好。 .recover方法是该模式的关键。我没有意识到可以做到这一点。非常感谢! –

+0

对于那些感兴趣的人,这个答案和更有效的替代方法在http://stackoverflow.com/a/15776974/29771中给出。 – Glenn

2

在完全不返回一个新的未来,它只是让你在未来完成时做些事情。因此,在执行onComplete之前,您的第一个Future块会返回,因此您将返回字符串的原始值。

我们所能做的就是用诺言回报未来,而未来是通过第一个未来的结果完成的。

val keylist = List("key1", "key2") 

    val myFuts: List[Future[String]] = keylist.map { 
    myid => { 
     // this line simulates an external call which returns a future (retrieval from S3) 
     val myfut = Future { 
     Thread.sleep(1); "START " + myid 
     } 
     var mystr = "This should have been overwritten" 
     val p = Promise[String]() 
     myfut.onComplete { 
     case Failure(ex) => 
      println(s"failed with error: $ex") 
      mystr = "FAILED" 
      p failure ex 
     case Success(myval) => 
      mystr = s"SUCCESS $myid: $myval" 
      println(mystr) 
      p success myval 
     } 
     p.future 
    } 
    } 

    val futset: Future[List[String]] = Future.sequence(myFuts) 
    println(Await.result(futset, 10 seconds)) 

会超灵便这将是一个mapAll方法,我问了一下这里: Map a Future for both Success and Failure

+0

它超时出现错误,当我运行它,但有趣的解决方案。我不明白'p'Promise是如何完成的,并且有与任务相关的价值。感谢您的帮助 –

+0

哎呀是的,补充说。 – monkjack

+0

现在工作很好。很高兴看到使用Promise的解决方案!我一直在想,除了竞争性完成以外,他们在哪里有用。 –