我有具有3个HTTP监听器像这样的一个应用程序:阿卡HTTP流监听一段时间后,停止处理数据字节
val futureResponse1: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = someUrl))
各3的正在侦听到非止动流(每个为不同的一种)。并用同组开始的简单流程,然后比较快的处理(非阻塞)处理它:
futureResponse1.flatMap {response =>
response.status match {
case StatusCodes.OK =>
val source: Source[ByteString, Any] = response.entity.dataBytes
source.
grouped(100).
map(doSomethingFast).
runWith(Sink.ignore)
case notOK => system.log.info("failed opening, status: " + notOK.toString())
}
...
我没有得到任何的异常或警告。但过了一段时间(可能是15-25分钟),听众突然停下来。一个接一个(不在一起)。
也许它的分组阶段是那里的问题?或者,也许连接/流只停止?或者由他们共享的调度员正在饿死/没有得到释放的东西。
任何想法,为什么这可能会发生?
====更新====
@RamonĴRomero的ý岗哨 我改变运行仅具有1流代替3-,我除去分组的阶段。几分钟后仍然发生。我怀疑这个流是基于超时关闭的。我所做的只是大块并消耗它们。
====更新====
找到原因,见下文。
可以提供约“各3是听更多细节一个不停的流“? “未来”价值观与流媒体之间的互动可能是您问题的根源...... –