2017-01-15 19 views
2

这个问题是基于我做的一个宠物项目和这个SO线程。在Akka HTTP路由定义中,我开始了一个长期运行的过程,当然我想这样做而不会阻止用户。我可以用下面的代码片段来实现这一目标:如何在带有超时的单独调度程序上运行Akka Streams图?

blocking-io-dispatcher { 
    type = Dispatcher 
    executor = "thread-pool-executor" 
    thread-pool-executor { 
    fixed-pool-size = 16 
    } 
    throughput = 1 
} 

complete { 
    Try(new URL(url)) match { 
    case scala.util.Success(u) => { 
     val src = Source.fromIterator(() => parseMovies(u).iterator) 

     src 
     .via(findMovieByTitleAndYear) 
     .via(persistMovies) 
     .toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right) 
     // run the whole graph on a separate dispatcher 
     .withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher")) 
     .run.flatten 
     .onComplete { 
      _ match { 
      case scala.util.Success(n) => logger.info(s"Created $n movies") 
      case Failure(t) => logger.error(t, "Failed to process movies") 
      } 
     } 

     Accepted 
    } 
    case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL" 
    } 
} 

什么那么,如果我已经解决了它的问题呢?问题是我不知道如何设置超时。图表的执行会创建一个Future,直到专用的blocking-io-dispatcher完成为止。如果我添加一个Await调用,代码块。有没有办法暂停?

回答

2

completionTimeout阶段应该在这里帮助。示例如下:

src 
    .completionTimeout(5.seconds) 
    ... 
    .run.flatten 
    .onComplete { 
     case scala.util.Success(n) => logger.info(s"Created $n movies") 
     case Failure(t: TimeoutException) => logger.error(t, "Timed out") 
     case Failure(t) => logger.error(t, "Failed to process movies") 
    } 

Docs参考here

+2

谢谢你;但是你的代码并不像所示的那样编译。在尝试实现图形('toMat')之前,必须应用'completionTimeout'。我已经接受你的答案,几乎是正确的。 –

相关问题