2013-05-26 23 views
1

我正在使用akka数据流,我想知道是否有办法导致特定的代码块等待未来的完成,而不明确使用该值未来。akka数据流和副作用

实际使用案例是我有一个文件,并且我希望文件在特定的未来完成时被删除,但不是之前。这是一个粗略的例子。首先想象我有这样的服务:

trait ASync { 
    def pull: Future[File] 
    def process(input : File): Future[File] 
    def push(input : File): Future[URI] 
} 

而且我有一个工作流程我想在非阻塞的方式运行:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = async.push(processedFile()) 

    // I'd like the following line executed only after storedUri is completed, 
    // not as soon as pulled file is ready. 
    pulledFile().delete() 

    storedUri() 
} 

回答

1

你可以尝试这样的事:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = for(uri <- async.push(processedFile())) yield { 
    pulledFile().delete() 
    uri 
    } 
    storedUri() 
} 

在此示例中,只有在pushFuture成功时才会调用pulledFile.delete。如果失败,将不会调用deletestoredUri未来的结果仍然是致电push的结果。

或者另一种方式是:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = async.push(processedFile()) andThen{ 
    case whatever => pulledFile().delete() 
    } 
    storedUri() 
} 

这里的区别是,delete将不管push成功或失败的调用。 storedUri的结果仍然是致电push的结果。

+0

问题是,如果我尝试将pullFile()放入传递给从async.push返回的Future上的函数的任何块中,则会发生大量编译错误。但是,我发现如果将pullFile()分配给块外的val,然后在块内使用它,我确实可以编译它。我会检查是否正确运行,然后更新答案。 –

0

可以使用为非阻塞工作流回调:

future onSuccess { 
    case _ => file.delete() //Deal with cases obviously... 

} 

来源:http://doc.akka.io/docs/akka/snapshot/scala/futures.html

或者,可以用Await.result方框:

val result = Await.result(future, timeout.duration).asInstanceOf[String] 

后者一般使用当你需要阻止时 - 例如在测试用例中 - 当非阻塞性能更高时,因为你不停放一个线程来启动另一个线程来恢复其他线程再次线程 - 由于资源管理开销,这比异步活动慢。

类型安全的工作人员称它为“反应性”。这是一个流行词。如果你在工作场所使用它,我会笑。