TLDR:是更好兑现每个请求一个流(即使用短寿命流)或使用跨请求单个流物化,当我有传出http请求作为一部分流的?阿卡流+阿卡-HTTP生命周期
详细信息:我有一个典型的服务,需要一个HTTP请求时,它驱散几个第三方服务的下游(不是由我控制),然后送回前汇总的结果。我正在使用akka-http进行客户端实施并喷洒服务器(遗留下来,随着时间的推移将会转移到akka-http)。示意性地:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
这可以通过物化每个请求的数据流或物化(部分)流一次,跨请求共享来实现任一。
实现每个请求会导致实现开销并且不清楚如何利用连接池。这个问题被描述为here(很多实现可能会耗尽池)。我可以将一个池包装在一个像here这样长时间运行的http流中,并将其包装在一个mapAsync
“上游”中,但错误处理策略对我来说并不明确。当单个请求失败并且流被终止时,它是否也会取下池?更多的是,我似乎需要协调请求和响应,因为它们没有按顺序返回。跨请求
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
共享流具有错误处理类似的问题 - 似乎有一些可以降低该流在飞行中的所有请求的故障模式。该代码将类似于host level API,但队列面向整个流。
哪种方式在这种情况下更好?
我曾尝试实现这两个解决方案,但也有许多设计选择在实施的每一个阶段,所以它似乎很容易搞砸了即使是在“正确”的道路。
尽管我认为它可以忽略不计,并且它与akka-http服务器的运行方式相同。
谢谢拉蒙!我的问题是我没有一个'Source',每个输入流都是http服务的请求,所以最多我有'Source.single'。在这种情况下,我将如何获得单一流量? – Tim
此外,我将如何处理单个流中的请求超时?我能够找到的唯一超时是'completionTimeout'阶段,但它失败了一个流,不会向下游传播错误。 – Tim