2016-09-27 24 views
1

我有具有3个HTTP监听器像这样的一个应用程序:阿卡HTTP流监听一段时间后,停止处理数据字节

val futureResponse1: Future[HttpResponse] = 
    Http().singleRequest(HttpRequest(uri = someUrl)) 

各3的正在侦听到非止动流(每个为不同的一种)。并用同组开始的简单流程,然后比较快的处理(非阻塞)处理它:

futureResponse1.flatMap {response => 
    response.status match { 
    case StatusCodes.OK => 
     val source: Source[ByteString, Any] = response.entity.dataBytes 
     source. 
     grouped(100).     
     map(doSomethingFast).        
     runWith(Sink.ignore)       

    case notOK => system.log.info("failed opening, status: " + notOK.toString()) 
    } 

...

我没有得到任何的异常或警告。但过了一段时间(可能是15-25分钟),听众突然停下来。一个接一个(不在一起)。

也许它的分组阶段是那里的问题?或者,也许连接/流只停止?或者由他们共享的调度员正在饿死/没有得到释放的东西。

任何想法,为什么这可能会发生?

====更新====

@RamonĴRomero的ý岗哨 我改变运行仅具有1流代替3-,我除去分组的阶段。几分钟后仍然发生。我怀疑这个流是基于超时关闭的。我所做的只是大块并消耗它们。

====更新====

找到原因,见下文。

+0

可以提供约“各3是听更多细节一个不停的流“? “未来”价值观与流媒体之间的互动可能是您问题的根源...... –

回答

1

这是原因:

EntityStreamSizeException:实际大小的实体(无)含量超标长度的限制(8388608个字节)!您可以在实现dataBytes流之前通过设置akka.http。[server | client] .parsing.max-content-length或调用HttpEntity.withSizeLimit来进行配置。

对于任何寻求在连续响应流的情况下的解决方案,你可以得到这样的来源,使用withoutSizeLimit:

val source: Source[ByteString, Any] = response.entity.withoutSizeLimit().dataBytes