2017-05-30 35 views
1

我再次尝试更新一些预播放2.5代码(基于此vid)。例如,以下以前是如何流式未来Stream Play in Play 2.5

Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500))) 

我已经创建了下面的方法用于使用阿卡的变通为Promise.timeout(废弃):

private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = { 
    val promise: Promise[Result] = Promise[Result]() 
    actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) } 
    promise.future 
    } 

根据Play Framework Migration Guide; Enumerators改写为源Source.unfoldAsync明显的Enumerator.generateM相当于所以我希望这会工作(其中strFuture[String]):

def inf = Action { request => 
    val str = keepResponding("stream me", 1.second, 2.second) 

    Ok.chunked(Source.unfoldAsync(str)) 
    } 

当然,我得到一个类型不匹配错误,望着unfoldAsync的情况下,类签名时:

final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) 

我可以看到,参数不正确的,但我不完全underst以及我应该怎样通过这个过程。

回答

1

unfoldAsync甚至比玩更通用的!自己generateM,因为它可以让你通过一个状态(S)值。这可以使发射的值取决于先前发射的值。

下面的例子,直到加载失败会受到越来越ID载荷值,:

val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒ 
    loadFromId(id) 
    .map(s ⇒ Some((id + 1, s))) 
    .recover{case _ ⇒ None} 
} 

def loadFromId(id: Int): Future[String] = ??? 

在你的情况是不是真的需要一个内部状态,因此在需要时可以只通过虚拟值,例如

val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒ 
    schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x)) 
} 

def schedule(data: String, delay: FiniteDuration): Future[Result] = { 
    akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))} 
} 

请注意,原来实行的keepResponding不正确,因为你不能完成Promise不止一次。阿卡after模式提供了一种更简单的方式来实现你所需要的。

但是请注意,在特定情况下,阿卡流提供带Source.tick一个更地道的解决方案:如果

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒ 
    loadSomeFuture() 
} 

def loadSomeFuture(): Future[String] = ??? 

或者更简单,你实际上并不需要异步计算在你的榜样

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me") 
+0

谢谢。你的第三和第四个例子的工作,但这两个都需要至少几分钟输出第一个结果,感觉应该不会发生。有什么办法可以加速你想到的吗?也出于某种原因,我无法得到第一个例子的工作 - 一个***前瞻性参考的错误扩展了价值源***的定义。谢谢 –