2012-11-27 31 views
0

我有听的ActiveMQ的跺脚服务器上的目标(队列或主题,也没关系),并简单地以下控制台程序记录它接收到控制台消息:为什么我的流正在被同伴关闭?

using System; 
using Apache.NMS.Stomp; 
using Apache.NMS; 
using Apache.NMS.Util; 

namespace StompTest 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      try 
      { 
       var connectionFactory = new ConnectionFactory("stomp:tcp://mybroker:61613"); 

       var connection = connectionFactory.CreateConnection(); 
       connection.ExceptionListener += new ExceptionListener(connection_ExceptionListener); 
       connection.Start(); 

       var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); 

       IDestination dest = SessionUtil.GetDestination(session, "queue://MyQueue"); 

       var consumer = session.CreateConsumer(dest); 
       consumer.Listener += new MessageListener(consumer_Listener); 

       Console.ReadKey(); 
      } 
      catch (NMSException ex) 
      { 
       Console.WriteLine("NMSException !! ==> " + ex.Message); 
      } 
     } 

     static void connection_ExceptionListener(Exception exception) 
     { 
      Console.WriteLine("Exception!! ==> " + exception.ToString()); 
     } 

     static void consumer_Listener(IMessage message) 
     { 
      var textMessage = message as ITextMessage; 
      if (textMessage == null) 
       Console.WriteLine("No ITextMessage..."); 
      else 
       Console.WriteLine("Received => " + textMessage.Text); 
     } 
    } 
} 

当我启动它时,只要我不断发送消息,它就可以正常工作。但是,如果有30秒不活动,我会得到一个例外。出现这种情况,即使没有将消息发送到队列:

Exception!! ==> Apache.NMS.Stomp.IOException: Peer closed the stream. 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadLine(BinaryReader dataIn) in c:\d 
ev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 284 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadCommandHeader(BinaryReader dataIn 
) in c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 208 
    at Apache.NMS.Stomp.Protocol.StompFrame.FromStream(BinaryReader dataIn) in c: 
\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 197 
    at Apache.NMS.Stomp.Protocol.StompWireFormat.Unmarshal(BinaryReader dataIn) i 
n c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompWireFormat.cs:line 121 
    at Apache.NMS.Stomp.Transport.Tcp.TcpTransport.ReadLoop() in c:\dev\NMS.Stomp 
\src\main\csharp\Transport\Tcp\TcpTransport.cs:line 279 

从谷歌上搜索周围,并读取Apache.NMS源代码,我想通了,它不得不做的事'InactivityMonitor',显然,当我在连接字符串上指定参数transport.useInactivityMonitor=false,我没有得到异常,并且一切正常。

但是据我所知我了解的一切,这inactivityMonitor有一个目的:确保“死”连接检测到并妥善清理。

所以一定有其他的错误!我添加了一个简单的控制台示踪剂上面的代码,并没有transport.useInactivityMonitor=false参数连接时是这样的输出:

Info: Connecting to: tcp://mybroker:61613/ 
Debug: Opening socket to: mybroker on port: 61613 
Debug: Connected to mybroker:61613 using InterNetwork protocol. 
Debug: Creating Inactivity Monitor: 1 
Debug: StompWireFormat - Marshaling: ConnectionInfo[ConnectionId=ID:mypc-309 
6-634896376801461632-0:0, Host=mybroker, MaxInactivityDuration=30000, ReadChec 
kInterval=30000, WriteCheckInterval=10000, MaxInactivityDurationInitialDelay=0, 
ClientId=ID:mypc-3096-634896376801461632-1:0, Password=, UserName=] 
Debug: StompWireFormat - Writing StompFrame[ Command=CONNECT, Properties={ heart 
-beat=10000,30000 client-id=ID:mypc-3096-634896376801461632-1:0 accept-versi 
on=1.0,1.1 host=mybroker}, Content=] 
Debug: StompWireFormat - Received StompFrame[ Command=CONNECTED, Properties={ ve 
rsion=1.1 server=ActiveMQ/5.6.0 session=ID:mypc-3096-634896376801461632-1:0 
heart-beat=0,0}, Content=System.Byte[]] 
Debug: InactivityMonitor[1]: Read Check time interval: 30000 
Debug: InactivityMonitor[1]: Initial Delay time interval: 10000 
Debug: InactivityMonitor[1]: Write Check time interval: 10000 
Debug: InactivityMonitor[1]: Starting the Monitor Timer. 
Debug: StompWireFormat - Received StompFrame[ Command=KEEPALIVE, Properties={}, 
Content=] 
Debug: InactivityMonitor[1]: New Keep Alive Received at -> 18:28:00.419 
Debug: StompWireFormat - Marshaling: ConsumerInfo[ConsumerId=ID:mypc-3096-63 
4896376801461632-0:0:1:1, Destination=queue://MyQueue, Ack Mode=AutoAcknowledge, 
PrefetchSize=1000, MaximumPendingMessageLimit=0, DispatchAsync=True, Selector=, 
SubscriptionName=, NoLocal=False, Exclusive=False, Retroactive=False, Priority= 
0, Transformation] 
Debug: StompWireFormat - Writing StompFrame[ Command=SUBSCRIBE, Properties={ id= 
ID:mypc-3096-634896376801461632-0:0:1:1 receipt=2 activemq.dispatchAsync=Tru 
e activemq.maximumPendingMessageLimit=0 activemq.priority=0 ack=client activemq. 
prefetchSize=1000 transformation=jms-xml destination=/queue/MyQueue}, Content=] 
Debug: StompWireFormat - Received StompFrame[ Command=RECEIPT, Properties={ rece 
ipt-id=2}, Content=System.Byte[]] 
Debug: StompWireFormat - Received StompFrame[ Command=KEEPALIVE, Properties={}, 
Content=] 
Debug: InactivityMonitor[1]: New Keep Alive Received at -> 18:28:00.508 
Debug: CheckConnection: Timer Elapsed at 27/11/2012 18:28:10 
Debug: InactivityMonitor[1]: Message sent since last write check. Resetting flag 
. 
Debug: InactivityMonitor[1]: A receive is in progress or already failed. 
Debug: CheckConnection: Timer Elapsed at 27/11/2012 18:28:20 
Debug: InactivityMonitor[1]: No Message sent since last write check. Sending a K 
eepAliveInfo. 
Debug: InactivityMonitor[1]: A read check is not currently allowed. 
Debug: InactivityMonitor[1] perparing for another Write Check 
Debug: InactivityMonitor[1] Write Check required sending KeepAlive. 
Debug: StompWireFormat - Marshaling: KeepAliveInfo[ commandId = 0, responseRequi 
red = False, ] 
Debug: StompWireFormat - Writing StompFrame[ Command=KEEPALIVE, Properties={}, C 
ontent=] 
Debug: CheckConnection: Timer Elapsed at 27/11/2012 18:28:30 
Debug: InactivityMonitor[1]: No Message sent since last write check. Sending a K 
eepAliveInfo. 
Debug: InactivityMonitor[1]: A read check is not currently allowed. 
Debug: InactivityMonitor[1] perparing for another Write Check 
Debug: InactivityMonitor[1] Write Check required sending KeepAlive. 
Debug: StompWireFormat - Marshaling: KeepAliveInfo[ commandId = 0, responseRequi 
red = False, ] 
Debug: StompWireFormat - Writing StompFrame[ Command=KEEPALIVE, Properties={}, C 
ontent=] 
Debug: Exception received in the Inactivity Monitor: Apache.NMS.Stomp.IOExceptio 
n: Peer closed the stream. 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadLine(BinaryReader dataIn) in c:\d 
ev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 284 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadCommandHeader(BinaryReader dataIn 
) in c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 208 
    at Apache.NMS.Stomp.Protocol.StompFrame.FromStream(BinaryReader dataIn) in c: 
\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 197 
    at Apache.NMS.Stomp.Protocol.StompWireFormat.Unmarshal(BinaryReader dataIn) i 
n c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompWireFormat.cs:line 121 
    at Apache.NMS.Stomp.Transport.Tcp.TcpTransport.ReadLoop() in c:\dev\NMS.Stomp 
\src\main\csharp\Transport\Tcp\TcpTransport.cs:line 279 
Exception!! ==> Apache.NMS.Stomp.IOException: Peer closed the stream. 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadLine(BinaryReader dataIn) in c:\d 
ev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 284 
    at Apache.NMS.Stomp.Protocol.StompFrame.ReadCommandHeader(BinaryReader dataIn 
) in c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 208 
    at Apache.NMS.Stomp.Protocol.StompFrame.FromStream(BinaryReader dataIn) in c: 
\dev\NMS.Stomp\src\main\csharp\Protocol\StompFrame.cs:line 197 
    at Apache.NMS.Stomp.Protocol.StompWireFormat.Unmarshal(BinaryReader dataIn) i 
n c:\dev\NMS.Stomp\src\main\csharp\Protocol\StompWireFormat.cs:line 121 
    at Apache.NMS.Stomp.Transport.Tcp.TcpTransport.ReadLoop() in c:\dev\NMS.Stomp 
\src\main\csharp\Transport\Tcp\TcpTransport.cs:line 279 
Debug: TransportFilter disposing of next Transport: MutexTransport 
Debug: TransportFilter disposing of next Transport: InactivityMonitor 
Debug: TransportFilter disposing of next Transport: TcpTransport 
Info: Closing The Session with Id ID:mypc-3096-634896376801461632-0:0:1 
Debug: Closing down the Consumer 
Error: Error during session close: Apache.NMS.Stomp.IOException: Channel was ina 
ctive for too long: tcp://mybroker:61613/ 
    at Apache.NMS.Stomp.Connection.Oneway(Command command) in c:\dev\NMS.Stomp\sr 
c\main\csharp\Connection.cs:line 539 
    at Apache.NMS.Stomp.MessageConsumer.DoClose() in c:\dev\NMS.Stomp\src\main\cs 
harp\MessageConsumer.cs:line 252 
    at Apache.NMS.Stomp.Session.DoClose() in c:\dev\NMS.Stomp\src\main\csharp\Ses 
sion.cs:line 307 
Info: Closed The Session with Id ID:mypc-3096-634896376801461632-0:0:1 

所以我觉得我的客户被“踢”由经纪人,对于处于非活动状态的时间太长。但我不明白为什么,从上面的日志来看,我的客户实际上是在发送“keepalive”消息,因为它应该如此。所以它不应该无效。

我没有选择如何继续解决这个问题。如果有人对此有所了解,大多会感激!

更新
版本号:

  • 库:Apache.NMS.Stomp V1.5.3
  • 经纪人:阿帕奇的ActiveMQ 5.6.0
+0

应该添加库和代理版本的详细信息。 –

+0

@TimBish:我添加了所需的信息。 – fretje

+0

@fretje:你找到了解决办法吗?我也遇到了这个问题,连接不能从中恢复。 – PeterBelm

回答

3

尝试使用跺脚故障转移运输。因此,而不是...

var connectionFactory = new ConnectionFactory("stomp:tcp://mybroker:61613"); 

使用...

var connectionFactory = new ConnectionFactory("failover:tcp://mybroker:61613"); 

现在,而不是这个ExceptionListener事件被提出,它是由你来理清,传输将自动重新连接。

提示:如果需要在断开/重新连接发生时通知您,则会引发Connection对象上的ConnectionInterruptedListener和ConnectionResumedListener事件。

另外:

我推荐使用Session.CreateDurableConsumer而不仅仅是Session.CreateConsumer。这样您就不会在断开/重新连接期间丢失任何消息。

希望这可以帮助别人。

相关问题