2017-03-15 34 views
1

我正在使用akka-http实现web套接字服务器。使用akka-http实现web socker服务器时,堆内存正在增长

以下是请求处理程序代码吧:

def decodeService: Flow[Message, Message, _] = { 
    Flow[Message].map { 
     case BinaryMessage.Strict(encodeMsg) => 
     try { 
      WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg)) 
     } catch { 
      case e: Exception => { 
      println("[ERROR] failed to send BinaryMessage.Strict: " + e) 
      TextMessage("[ERROR] failed receiving BinaryMessage.Strict") 
      } 
     } 
     TextMessage("[INFO] BinaryMessage.Strict") 

     case BinaryMessage.Streamed(streamedMsg) => { 
     implicit val system = ActorSystem("DecoderSystem") 
     implicit val materializer = ActorMaterializer() 
     val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq) 
     streamedMsgFuture.onComplete { completedStream => 
      var completeBytestring = new ByteStringBuilder() 
      //I'm sure there's a better way to do this.. but hey, it's O(n) 
      completedStream.foreach { x => 
      x.foreach { y => 
       completeBytestring ++= y 
      } 
      } 
      try { 
       WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result())) 
      } catch { 
      case e: Exception => { 
       println("[ERROR] failed to send BinaryMessage.Streamed: " + e) 
       TextMessage("[ERROR] failed receiving BinaryMessage.Streamed") 
      } 
      } finally { 
      completeBytestring.clear() 
      } 
     } 
     TextMessage("[INFO] BinaryMessage.Streamed") 
     } 

     case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message") 
     case _ => TextMessage("Message type unsupported") 
    } 
    } 

我的Web服务器获取数据流频繁每1分钟。我看到记忆在增长。如果我不处理流式消息,它能够保留。客户端和服务器之间的连接也是持久的。

我错用了Flows/Sink/Source吗?如何冲洗流?

感谢

回答

3

好了,最突出的问题是,你对创建一个全新的ActorSystem收到的每条流的消息。 ActorSystem就像是演员的线程池;您希望尽可能少地创建它们,理想情况下只有一个用于整个应用程序。不仅你为每条消息创建它们,你也不会关闭它们 - 所有配置在ActorSystem中的调度程序,它所拥有的所有资源都将永久挂起。当然,如果您收到大量流式消息,您的内存使用量将会增长。

由于您使用akka-http,您必然会有一个ActorSystem您调用Http().bind*。您需要在decodeService方法中使其可访问。另外,计算组合字节流的方式对我来说似乎过于复杂。考虑这样写的:

def decodeService: Flow[Message, Message, _] = Flow[Message].mapAsync(4) { 
    case m: BinaryMessage.Strict => 
    Future.successful(m) 
    case BinaryMessage.Streamed(streamMsg) => 
    streamMsg.runReduce(_ ++ _).map(BinaryMessage.Strict) 
    case m => 
    Future.successful(m) 
}.map { 
    case BinaryMessage.Strict(encodeMsg) => 
    try { 
     WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg)) 
     TextMessage("[INFO] BinaryMessage.Strict") 
    } catch { 
     case NonFatal(e) => 
     println("[ERROR] failed to send BinaryMessage.Strict: " + e) 
     TextMessage("[ERROR] failed receiving BinaryMessage.Strict") 
    } 
    case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message") 
    case _ => TextMessage("Message type unsupported") 
} 

在这里,首先我们把所有的二进制消息BinaryMessage.Strict,然后我们就像在你原来的代码处理它们。请注意,您必须在try块内写入确认消息,否则即使您有异常,也会返回成功的消息。另外,如果你决定不是在所有处理文本信息,该代码会变得更加简单:

def decodeService: Flow[Message, Message, _] = Flow[Message] 
    .filterNot(_.isText) 
    .mapAsync(4) { 
    case BinaryMessage.Strict(binary) => 
     Future.successful(binary) 
    case BinaryMessage.Stream(binaryStream) => 
     binaryStream.runReduce(_ ++ _) 
    .map { encodeMsg => 
    try { 
     WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg)) 
     TextMessage("[INFO] BinaryMessage.Strict") 
    } catch { 
     case NonFatal(e) => 
     println("[ERROR] failed to send BinaryMessage.Strict: " + e) 
     TextMessage("[ERROR] failed receiving BinaryMessage.Strict") 
    } 
    } 
+0

我收到以下错误: DescriptionResourcePathLocation类型 找不到参数materializer隐含值:akka.stream .Materializer – anirudhas

+0

我按照你的第二个建议。由于我不处理文本消息 – anirudhas

+0

“无法找到参数实现者的隐式值”:这就是我所说的,你必须使演员系统和实现者可用于'decodeService'方法。如果你使用akka-http,你必须*拥有它们,因为你必须创建一个实现器才能使用'Http()。bind *'方法。因此,您已经在某处创建了它们;将它们以适当的方式传递给'decodeService'方法,并且您将会很好。 –