2016-11-24 48 views
4

我们有一个API在裸骨Scala Akka HTTP中实现 - 两个路由面向大量计算(CPU和内存密集型)。没有集群 - 全部运行在一台结实的机器上。计算是适当的 - 可以花费超过60秒来完成一个孤立的请求。我们并不关心速度。没有阻塞IO,只是很多CPU处理。Akka HTTP和长时间运行的请求

当我开始进行性能测试时,一个有趣的模式显示:请求A1,A2,...,A10通过。他们使用的资源相当繁重,事实证明,Akka会针对超出的请求A5-A10返回HTTP 503。问题在于,即使没有人拿起结果,计算仍在运行。

并从那里我们看到级联性能崩溃:请求A11-A20到达服务器仍然在请求A5-A10。很显然,这些新的请求也有可能超时 - 即使服务器更忙,甚至更高。因此,当Akka触发超时时,它们中的一些会运行,使得服务器更加繁忙和更慢,然后新的一批请求就会通过......所以在系统运行一段时间后,您几乎看到所有请求超时开始失败。在停止加载后,您会看到日志中仍有一些请求正在处理。我试过在单独的ExecutionContext以及系统调度程序中运行计算,尝试使它完全异步(通过未来组合),但结果仍然相同。滞留作业使得服务器如此繁忙,以至于几乎每个请求都会失败。

https://github.com/zcox/spray-blocking-test中描述了一个类似的情况,但重点转移到那里 - /ping对于处理长时间运行的请求的端点上的或多或少的稳定责任并不重要。

问题:如何设计我的应用程序以更好地中断挂起的请求?我可以忍受重负载下一小部分失败的请求,但几秒钟后让整个系统停止工作是不可接受的。

+0

非常宽泛的问题。简而言之,您应该在重负载情况下立即拒绝某些请求(只需说抱歉或追踪运行过多计算的用户或维护队列)。你也应该为你的计算,数据库,喷雾使用单独的上下文......所以你的http前端层总会有响应。 – ipoteka

回答

1

Akka HTTP不会自动终止处理已超时的请求。通常需要额外的簿记才能达到这个目标,所以它不会默认开启。我认为这是一个疏忽,TBH,我自己也和Akka HTTP有类似的问题。

我认为您需要手动中止处理请求超时,否则服务器在超载时不会恢复,如您所见。

没有一个标准的机制可以实现它(请参阅“How to cancel Future in Scala?”)。如果线程在没有I/O的情况下执行CPU工作,那么Thread.interrupt()将无用。相反,你应该建立一个DeadlinePromise或类似的显示,如果请求仍处于打开状态,并通过周围,并定期您的计算过程中检查超时:

// in the HTTP server class: 
val responseTimeout: Duration = 30.seconds 

val routes = 
    path("slowComputation") { 
    complete { 
     val responseTimeoutDeadline: Deadline = responseTimeout.fromNow 
     computeSlowResult(responseTimeoutDeadline) 
    } 
    } 

// in the processing code: 
def computeSlowResult(responseDeadline: Deadline): Future[HttpResponse] = Future { 
    val gatherInputs: List[_] = ??? 
    gatherInputs.fold(0) { (acc, next) => 

    // check if the response has timed out 
    if (responseDeadline.isOverdue()) 
     throw new TimeoutException() 

    acc + next // proceed with the calculation a little 
    } 
} 

(如果Promise已经完成将是一个检查比检查一个Deadline是否已经过期要便宜很多,我已经把上面的代码放在后面,因为它更容易编写。)

+0

给了这个去 - 似乎有所改善的东西了一点,至少服务器并没有完全崩溃。幸运的是,在这个端点上,我们有一个或多或少的顺序过程,所以我将期限测试注入到传递给'Future#flatMap'的函数中。想知道当一个更一般的情况下,当没有可以遍历的输入/步骤序列时,解决方案可能是什么? – Anton

+0

“更普遍的情况下,当没有可以遍历的输入/步骤序列”时 - 我认为这是通用解决方案。操作和正在等待它完成的HTTP请求之间总会有一个链接,否则输出将如何到达客户端? – Rich

0

spray-blocking-test使用我认为不存在于Akka HTTP中的库。我倒是有类似的问题,我解决它,如下所示:

application.conf

blocking-io-dispatcher { 
    type = Dispatcher 
    executor = "thread-pool-executor" 
    thread-pool-executor { 
    fixed-pool-size = 16 
    } 
    throughput = 1 
} 

路线

complete { 
    Try(new URL(url)) match { 
    case scala.util.Success(u) => { 
     val src = Source.fromIterator(() => parseMovies(u).iterator) 

     src 
     .via(findMovieByTitleAndYear) 
     .via(persistMovies) 
     .completionTimeout(5.seconds) 
     .toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right) 
     // run the whole graph on a separate dispatcher 
     .withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher")) 
     .run.flatten 
     .onComplete { 
      _ match { 
       case scala.util.Success(n) => logger.info(s"Created $n movies") 
       case Failure(t) => logger.error(t, "Failed to process movies") 
      } 
     } 

     Accepted 
    } 
    case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL" 
    } 
} 

而处理保持响应立即返回发生在后台。

补充阅读:

http://doc.akka.io/docs/akka/current/scala/dispatchers.html http://blog.akka.io/streams/2016/07/06/threading-and-concurrency-in-akka-streams-explained

+0

如果我理解正确,在您的用例中,您不需要在HTTP响应中返回计算结果? – Anton

+0

@Anton正确。如果结果需要,我会被迫等待。 –

+0

我认为我们的问题是正交的 - 然后在后台执行任务对于单独的调度程序来说相当简单,问题在于当他们不再需要时停止它们 - 如果它们没有效果并且只将值返回给请求者。我的问题是请求者放弃超时后服务器仍在运行 – Anton