首先我很抱歉,如果这个问题已经在其他地方处理过了,我只是没有找到解决我的特定问题的东西。RabbitMQ java.lang.OutOfMemoryError
我有一个网关服务器,它从外部接收消息。它将它放入我的订单处理服务器正在侦听的队列中。我的订单处理服务器侦听2个队列(在一个线程中)。队列1 - 网关服务器,队列2 - 清除服务器。
所以在我的订单处理器中,我有工作线程。我使用ExecutorService来管理我的线程。问题出在工作者线程中。
在工作线程中,我制作了两个用于将消息发布到清除服务器或网关服务器的MQ实例。我基本上需要做一些处理,然后将这个消息发布到这些队列中。
我想知道的是,每当我处理完消息后,是否应关闭工作线程中的通道和连接?
如果我不处理8-900消息后处理消息,然后关闭后,在每个工作线程的MQ连接,我开始得到以下异常间歇性:
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
at interfaces.MQ.<init>(MQ.java:41)
at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45)
at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47)
如果我做关闭连接处理味精那么一段时间,我得到以下异常后间歇后,每工作线程:]
Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108)
at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94)
at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
我使用basicQos
当我创建消费保持的RabbitMQ的内部队列中合理的一个MQ连接。
创建我的MQ连接的消费方式如下:
_channel.queueDeclare(this._mqName.toString(), true, false, false, null);
_channel.basicConsume(this._mqName.toString(), true, _consumer);
_channel.basicQos(50);
感谢看着这个和任何建议或帮助将非常感激。它很可能是我没有正确处理我的上下文..
每次发送消息时都不需要关闭连接和通道。另外,我们在这里讨论的是什么负载?服务器在发送时是否进入流动状态? – hveiga
流动状态是什么意思?我已经尝试了关闭和不关闭队列的方法。我让我的网关服务器产生2k个虚拟订单,并将其推送到订单处理器正在监听的队列中。如果我的工作线程不需要随后将消息推送到队列上,那么事情会变得更容易,因为那样我就可以避免为了在我的工作线程中发布MQ连接。 – OriginalCliche
当代理无法将消息足够快地从交换路由到队列时,流状态发生在RabbitMQ中。发生这种情况时,生产者会受到限制,他们会在内部开始累积消息,等待代理处理负载。如果您不断尝试发送消息,但代理处于流动状态,则消息将存储在JVM内存中,这意味着在某些时候,您将耗尽内存。要检查您是否正在进入流状态,您需要在发送消息时检查代理中的流连接或阻塞状态中的连接。 – hveiga