2016-04-29 28 views
0

嗨我正在尝试处理文件中的数据。斯卡拉等待未来的列表执行

这是我在下面使用的代码。

我有一份期货清单,并试图从这些期货中获得产出。

一切都很好,但返回的最后一行在OnSuccess之前执行。

如何在没有阻止操作的情况下更改该行为。

def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): RowMessage = { 
var cells = row.split(delimiter) 
var passedRules = new ListBuffer[RuleResult]() 
val failedRules = new ListBuffer[RuleResult]() 
val rulesFuture = rules.map { 
    i => Future { 
    val cells = row.split(delimiter); 
    //some processing.... 
    } 
} 
val f1 = Future.sequence(rulesFuture) 
f1 onComplete { 
    case Success(results) => for (result <- results) (result.map(x => { 
    if (x.isPassFailed) { 
     passedRules += x 
    } 
    else { 
     failedRules += x 
    } 
    })) 
    case Failure(t) => println("An error has occured: " + t.getMessage) 
} 
return new RowMessage(passedRules.toList, failedRules.toList) 
} 
+0

它与阿卡流有什么关系? –

回答

1

你不能避免阻塞并返回一个普通的RowMessage。您还需要返回Future

def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): Future[RowMessage] = { 
    val cells = row.split(delimiter) 
    Future.traverse(rules) { i => 
    Future { 
     //some processing.... 
    } 
    } map { results => 
    val (passed, failed) = results.partition(_.isPassFailed) 
    new RowMessage(passed, failed) 
    } 
} 

也想想你的算法,以避免可变状态,尤其是当你从不同的期货更改它。

Future.traverse相当于您的map + Future.sequence。然后,而不是onComplete,只需映射Future修改列表。您可以使用partition轻松拆分它,而不是您一直在做的事情。

你不需要使用return,实际上你不应该除非你知道你在做什么。

Btw isPassFailed对我来说听起来不像一个合理的方法名称,特别是考虑到如果它是真的,您将它添加到传递的规则。