2016-12-30 35 views
0

关于如何处理期货列表,StackOverflow有一些建议,但我想尝试自己的方法。但我无法编译下面的代码无法编译未来列表的代码

我有期货的列表。 我想统计他们有多少人通过或失败。我应该得到(2,1) 我将它存储在一个元组中 我想要采取的方法是遍历列表中的每个元素。列表的元素是Future [Int]。对于每个元素,我调用flatMap,它调用递归的下一个周期(我假设如果flatMap被调用,那么特定的未来会成功,所以我增加了传递计数)。同样,我想调用下一个递归循环恢复和增量失败计数,但我收到编译错误。

import scala.concurrent._ 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success, Try} 
import scala.concurrent.duration._ 
import scala.language.postfixOps 

object ConcurrencyExample extends App { 

    type pass = Int 
    type fail = Int 

    val time = System.currentTimeMillis() 

//use recursion to process each Future in the list 
    def segregate(l:List[Future[Int]]):Future[Tuple2[pass,fail]] = { 
    def go(l:List[Future[Int]],t:Tuple2[pass,fail]):Future[Tuple2[pass,fail]] = { 
     l match { 
      case Nil => Future{t} 
      //l is List of Future[Int]. flatMap each successful Future 
      //recover each failed Future 
      case l::ls => { 
      l flatMap (x => go(ls, (t._1 + 1, t._2))) 
       **l.recover({ case e => go(ls, (t._1 + 1, t._2))})**//I get error here 
      } 
     } 
    } 
    go(l,(0,0)) 
    } 

//hardcoded future 
    val futures2: List[Future[Int]] = List(Future { 
    1 
    }, Future { 
    2 
    }, Future { 
    throw new Exception("error") 
    }) 


    val result = segregate(futures2) 
    result onComplete { 
    case Success(v) => println("pp:" + v) 
    case Failure(v) => println("fp:" + v) 
    } 

    Await.result(result,1000 millis) 
} 

回答

1

如果你看一下docsrecover的签名是:

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] 

要调用recoverl这是一个Future[Int]所以recover期待一个U >: Int

但是您再次致电go,其返回类型为Future[(pass, fail)]而非>: Int

2

@ evan058对恢复的签名是正确的。但是,您可以通过将恢复为恢复为来修复程序。

recoverWith恢复flatMap地图

下面是完整的解决方案(含有少量风格的改进):

import scala.concurrent._ 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success, Try} 
import scala.concurrent.duration._ 
import scala.language.postfixOps 

object ConcurrencyExample extends App { 

    type pass = Int 
    type fail = Int 

    val time = System.currentTimeMillis() 

    //use recursion to process each Future in the list 
    def segregate[T](fs:List[Future[T]]):Future[(pass,fail)] = { 
    def go(fs:List[Future[T]],r:Future[(pass,fail)]):Future[(pass,fail)] = fs match { 
     case Nil => r 
     case l::ls => 
     val fx = l.transform({_ => (1, 0)}, identity).recoverWith[(pass,fail)]({case _: Exception => Future(0, 1) }) 
     for (x <- fx; t <- r; g <- go(ls, Future(t._1+x._1,t._2+x._2))) yield g 
    } 
    go(fs,Future((0,0))) 
    } 

    //hardcoded future 
    val futures2 = List(Future(1), Future(2), Future(throw new Exception("error")))  

    val result = segregate(futures2) 
    result onComplete { 
    case Success(v) => println(s"successes: ${v._1}, failures: ${v._2}") 
    case Failure(v) => v.printStackTrace() 
    } 

    Await.result(result,1000 millis) 
} 
+0

难道OP仍然有'(及格,不及格)',而不是一些'U'其中'U>:Int'? –

+0

啊,我明白你的意思了。看起来你需要一个'asInstanceOf [Future [(pass,fail)]]'否则它会抱怨获得'Future [Any]',但是之后它会为我运行。 –

+0

男士 - 谢谢。你的意思是这样吗?代码编译但我得到一个运行时错误java.lang.ClassCastException:java.lang.Integer不能转换为scala.Tuple2'我现在写的代码是'flatMap(x => go(ls ,(t._1 + 1,t._2))) l.asInstanceOf [Future [(pass,fail)]]。recoverWith({case e => go(ls,(t._1 + 1,t._2 ))})' –