2013-11-28 28 views
0

首先我很抱歉,如果这个问题已经在其他地方处理过了,我只是没有找到解决我的特定问题的东西。RabbitMQ java.lang.OutOfMemoryError

我有一个网关服务器,它从外部接收消息。它将它放入我的订单处理服务器正在侦听的队列中。我的订单处理服务器侦听2个队列(在一个线程中)。队列1 - 网关服务器,队列2 - 清除服务器。

所以在我的订单处理器中,我有工作线程。我使用ExecutorService来管理我的线程。问题出在工作者线程中。

在工作线程中,我制作了两个用于将消息发布到清除服务器或网关服务器的MQ实例。我基本上需要做一些处理,然后将这个消息发布到这些队列中。

我想知道的是,每当我处理完消息后,是否应关闭工作线程中的通道和连接?

如果我不处理8-900消息后处理消息,然后关闭后,在每个工作线程的MQ连接,我开始得到以下异常间歇性:

java.lang.OutOfMemoryError: unable to create new native thread 
    at java.lang.Thread.start0(Native Method) 
    at java.lang.Thread.start(Unknown Source) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) 
    at interfaces.MQ.<init>(MQ.java:41) 
    at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70) 
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70) 
    at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45) 
    at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47) 

如果我做关闭连接处理味精那么一段时间,我得到以下异常后间歇后,每工作线程:]

Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread 
    at java.lang.Thread.start0(Native Method) 
    at java.lang.Thread.start(Unknown Source) 
    at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108) 
    at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94) 
    at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696) 
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) 

我使用basicQos当我创建消费保持的RabbitMQ的内部队列中合理的一个MQ连接。

创建我的MQ连接的消费方式如下:

_channel.queueDeclare(this._mqName.toString(), true, false, false, null); 
_channel.basicConsume(this._mqName.toString(), true, _consumer); 
_channel.basicQos(50); 

感谢看着这个和任何建议或帮助将非常感激。它很可能是我没有正确处理我的上下文..

+0

每次发送消息时都不需要关闭连接和通道。另外,我们在这里讨论的是什么负载?服务器在发送时是否进入流动状态? – hveiga

+0

流动状态是什么意思?我已经尝试了关闭和不关闭队列的方法。我让我的网关服务器产生2k个虚拟订单,并将其推送到订单处理器正在监听的队列中。如果我的工作线程不需要随后将消息推送到队列上,那么事情会变得更容易,因为那样我就可以避免为了在我的工作线程中发布MQ连接。 – OriginalCliche

+0

当代理无法将消息足够快地从交换路由到队列时,流状态发生在RabbitMQ中。发生这种情况时,生产者会受到限制,他们会在内部开始累积消息,等待代理处理负载。如果您不断尝试发送消息,但代理处于流动状态,则消息将存储在JVM内存中,这意味着在某些时候,您将耗尽内存。要检查您是否正在进入流状态,您需要在发送消息时检查代理中的流连接或阻塞状态中的连接。 – hveiga

回答

0

感谢您的输入家伙。我解决了这个问题。

我正在每个工作线程中创建连接。现在,我在主线程上创建连接,并将其传递给从该连接创建其通道的工作线程。这似乎是一种享受。

然而这意味着,我将不得不重新设计我的MQ类来处理这个工作流程。

0

好像你有内存泄漏。使用分析器。

0

我对RabbitMQ并不熟悉,但我怀疑你或RabbitMQ试图创建更多的线程来配置操作系统。 也许这两个链接可以帮助您:

- http://stackoverflow.com/questions/16789288/java-lang-outofmemoryerror-unable-to-create-new-native-thread 
- http://javaeesupportpatterns.blogspot.de/2012/09/outofmemoryerror-unable-to-create-new.html 
相关问题