我正在使用akka-http实现web套接字服务器。使用akka-http实现web socker服务器时,堆内存正在增长
以下是请求处理程序代码吧:
def decodeService: Flow[Message, Message, _] = {
Flow[Message].map {
case BinaryMessage.Strict(encodeMsg) =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
}
TextMessage("[INFO] BinaryMessage.Strict")
case BinaryMessage.Streamed(streamedMsg) => {
implicit val system = ActorSystem("DecoderSystem")
implicit val materializer = ActorMaterializer()
val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
streamedMsgFuture.onComplete { completedStream =>
var completeBytestring = new ByteStringBuilder()
//I'm sure there's a better way to do this.. but hey, it's O(n)
completedStream.foreach { x =>
x.foreach { y =>
completeBytestring ++= y
}
}
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result()))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Streamed: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Streamed")
}
} finally {
completeBytestring.clear()
}
}
TextMessage("[INFO] BinaryMessage.Streamed")
}
case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
case _ => TextMessage("Message type unsupported")
}
}
我的Web服务器获取数据流频繁每1分钟。我看到记忆在增长。如果我不处理流式消息,它能够保留。客户端和服务器之间的连接也是持久的。
我错用了Flows/Sink/Source吗?如何冲洗流?
感谢
我收到以下错误: DescriptionResourcePathLocation类型 找不到参数materializer隐含值:akka.stream .Materializer – anirudhas
我按照你的第二个建议。由于我不处理文本消息 – anirudhas
“无法找到参数实现者的隐式值”:这就是我所说的,你必须使演员系统和实现者可用于'decodeService'方法。如果你使用akka-http,你必须*拥有它们,因为你必须创建一个实现器才能使用'Http()。bind *'方法。因此,您已经在某处创建了它们;将它们以适当的方式传递给'decodeService'方法,并且您将会很好。 –