2017-09-26 133 views
4

TLDR:是更好兑现每个请求一个流(即使用短寿命流)或使用跨请求单个流物化,当我有传出http请求作为一部分流的?阿卡流+阿卡-HTTP生命周期

详细信息:我有一个典型的服务,需要一个HTTP请求时,它驱散几个第三方服务的下游(不是由我控制),然后送回前汇总的结果。我正在使用akka-http进行客户端实施并喷洒服务器(遗留下来,随着时间的推移将会转移到akka-http)。示意性地:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

这可以通过物化每个请求的数据流或物化(部分)流一次,跨请求共享来实现任一。

实现每个请求会导致实现开销并且不清楚如何利用连接池。这个问题被描述为here(很多实现可能会耗尽池)。我可以将一个池包装在一个像here这样长时间运行的http流中,并将其包装在一个mapAsync“上游”中,但错误处理策略对我来说并不明确。当单个请求失败并且流被终止时,它是否也会取下池?更多的是,我似乎需要协调请求和响应,因为它们没有按顺序返回。跨请求

// example of stream per request 

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port) 
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] = 
    Flow[HttpRequest] 
     .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream. 
     .via(connectionFlow) 
     .map { case (response, _) => response } 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .via(httpFlow) 
    .mapAsync(1) { 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 


// example of stream per request with long running http stream 

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue 
def queueRequest(request: HttpRequest): Future[HttpResponse] 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .mapAsync(1)(queueRequest) 
    .mapAsync(1) { 
     // somehow reconcile request with response? 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 

共享流具有错误处理类似的问题 - 似乎有一些可以降低该流在飞行中的所有请求的故障模式。该代码将类似于host level API,但队列面向整个流。

哪种方式在这种情况下更好?

我曾尝试实现这两个解决方案,但也有许多设计选择在实施的每一个阶段,所以它似乎很容易搞砸了即使是在“正确”的道路。

尽管我认为它可以忽略不计,并且它与akka-http服务器的运行方式相同。

回答

1

一般来说,是更好使用一个连接Flow和调度所有的请求通过单流。主要原因是由于新实现可能实际上每次都会形成新的Connection(取决于连接池设置)。

你是正确的,这会导致一些并发症:

订购:通过提供随机UUID作为元组 要传递到连接第二流量值,你可以消灭你的能力将请求与响应相关联。在数组中的额外T值可以被用作“相关ID”知道哪些HttpResponse你是从流量获取。在您的特定例如,你可以使用来自Range您创建的初始Int

val responseSource : Source[(Try[HttpResponse], Int), _] = 
    Source 
    .fromIterator(() => Iterator range (0,5)) 
    .map(i => HttpRequest(...) -> i) 
    .via(connectionFlow) 

现在每个响应带有你可以用它来处理响应原来的int值。

错误处理:您在声明“单个请求失败并且流已终止”时不正确。单个请求失败不一定会导致流失败。相反,您只需从连接流中获取(Failure(exception), Int)值。您现在知道哪个Int导致了失败,并且您有流程中的异常。

+0

谢谢拉蒙!我的问题是我没有一个'Source',每个输入流都是http服务的请求,所以最多我有'Source.single'。在这种情况下,我将如何获得单一流量? – Tim

+0

此外,我将如何处理单个流中的请求超时?我能够找到的唯一超时是'completionTimeout'阶段,但它失败了一个流,不会向下游传播错误。 – Tim