2012-10-18 81 views
8

1.)从.net客户端,我如何测试客户端是否连接到服务器(即可以发送和接收)是的,我可以在try块中发送消息并捕获随后的异常,但我希望能有一个更优雅的解决方案。如何获得Websphere MQ连接状态,以及如何重置连接:

2)如何打开,关闭和重新打开连接?在我试图解决上面的问题1时,我发现如果我打开一个连接,然后调用connection.Close()我无法从连接工厂获得另一个连接(请参阅下面的代码片段)。我收到错误消息XMSCC0008

我正在使用非常标准的vanilla MQ配置。以下是我的客户端如何连接:

ISession session = MQAccess.GetSession(MQAccess.Connection); 
IDestination destination = session.CreateTopic(SubTopicName); 
Consumer = MQAccess.GetConsumer(session, destination); 
Consumer.MessageListener = new MessageListener(HandleMQSubEvent); 
MQAccess.Connection.Start(); 

其中MQAccess是一个小工具类。

编辑的问题补充MQAccess代码:

public static class MQAccess 
{ 
    public static readonly MQConfigurationSectionHandler ConfigSettings; 
    public static readonly IConnectionFactory ConnectionFactory; 

    private static readonly IConnection connection; 
    public static IConnection Connection 
    { 
     get { return connection; } 
    } 

    static MQAccess() 
    { 
     ConfigSettings = (MQConfigurationSectionHandler) 
      ConfigurationManager.GetSection("mq-configuration"); 

     XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ); 
     ConnectionFactory = factory.CreateConnectionFactory(); 
     ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname); 
     ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port); 
     ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel); 

     if (ConfigSettings.QueueManager == string.Empty) 
     { 
      ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ""); 
     } 
     else 
     { 
      ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager); 
     } 

     connection = GetConnection(); 
    } 

    public static IConnection GetConnection() 
    { 
     return ConnectionFactory.CreateConnection(); 
    } 

    public static ISession GetSession(IConnection connection) 
    { 
     return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge); 
    } 

    public static IMessageProducer GetProducer(ISession session, IDestination destination) 
    { 
     return session.CreateProducer(destination); 
    } 

    public static IMessageConsumer GetConsumer(ISession session, IDestination destination) 
    { 
     return session.CreateConsumer(destination); 
    } 

    public static void MQPub(string TopicURI, string message) 
    { 
     using (var session = GetSession(Connection)) 
     { 
      using (var destination = session.CreateTopic(TopicURI)) 
      { 
       using (var producer = GetProducer(session, destination)) 
       { 
        producer.Send(session.CreateTextMessage(message)); 
       } 
      } 
     } 
    } 

    public static void MQPub(string TopicURI, IEnumerable<string> messages) 
    { 
     using (var session = GetSession(Connection)) 
     { 
      using (var destination = session.CreateTopic(TopicURI)) 
      { 
       using (var producer = GetProducer(session, destination)) 
       { 
        foreach (var message in messages) 
        { 
         producer.Send(session.CreateTextMessage(message)); 
        } 
       } 
      } 
     } 
    } 
} 

编辑:重命名MQAccess类MQClient。根据T Rob的建议,将它作为实例类。 Disconnect方法仍与上述

public class MQClient : IDisposable 
{ 
    public MQConfigurationSectionHandler ConfigSettings { get; private set; } 
    public IConnectionFactory ConnectionFactory { get; private set; } 

    public IConnection Connection { get; private set; } 

    public IMessageConsumer Consumer { get; private set; } 
    public IMessageProducer Producer { get; private set; } 
    // Save sessions as fields for disposing and future subscription functionality 
    private ISession ProducerSession; 
    private ISession ConsumerSession; 
    public string SubTopicName { get; private set; } 
    public string PubTopicName { get; private set; } 
    public bool IsConnected { get; private set; } 
    public event Action<Exception> ConnectionError; 
    private Action<IMessage> IncomingMessageHandler; 

    public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler) 
    { 
     // Dont put connect logic in the constructor. If we lose the connection we may need to connect again. 
     SubTopicName = subTopicName; 
     PubTopicName = pubTopicName; 
     IncomingMessageHandler = incomingMessageHandler; 
    } 

    public string Connect() 
    { 
     IsConnected = false; 
     string errorMsg = string.Empty; 

     ConfigSettings = (MQConfigurationSectionHandler) 
       ConfigurationManager.GetSection("mq-configuration"); 

     XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ); 
     ConnectionFactory = factory.CreateConnectionFactory(); 
     ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname); 
     ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port); 
     ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel); 

     if (ConfigSettings.QueueManager == string.Empty) 
      ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ""); 
     else 
      ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager); 

     Connection = ConnectionFactory.CreateConnection(); 


     if (!string.IsNullOrEmpty(PubTopicName)) 
     { 
      ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge); 
      Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName)); 
     } 

     if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null) 
     { 
      ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge); 
      Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName)); 
      Consumer.MessageListener = new MessageListener(IncomingMessageHandler); 
     } 

     try 
     { 
      Connection.Start(); 
      Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler); 
      IsConnected = true; 
     } 
     catch (TypeInitializationException ex) 
     { 
      errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ. Check the Queue configuration in App.config. The error message is: " + ex.Message; 
     } 
     catch (IllegalStateException ex) 
     { 
      errorMsg = "An IllegalStateException error occured while attempting to connect to MQ. Check the Queue configuration in App.config. The error message is: " + ex.Message; 
     } 

     return errorMsg; 
    } 

    public void Disconnect() 
    { 
     if (Producer != null) 
     { 
      Producer.Close(); 
      Producer.Dispose(); 
      Producer = null; 
     } 

     if (ProducerSession != null) 
     { 
      // Call Unsubscribe here if subscription is durable 

      ProducerSession.Close(); 
      ProducerSession.Dispose(); 
      ProducerSession = null; 
     } 

     if (Connection != null) 
     { 
      Connection.Stop(); 

      //if (Connection.ExceptionListener != null) 
      // Connection.ExceptionListener = null; 

      // Per Shashi............ 
      //if (Consumer.MessageListener != null) 
      // Consumer.MessageListener = null; 

      Connection.Close(); 
      Connection.Dispose(); 
      Connection = null; 
     } 

     if (Consumer != null) 
     { 

      if (Consumer.MessageListener != null) 
       Consumer.MessageListener = null; 

      Consumer.Close(); 
      Consumer.Dispose(); 
      Consumer = null; 
     } 


     if (ConsumerSession != null) 
     { 
      // Call Unsubscribe here if subscription is durable 
      ConsumerSession.Close(); 
      ConsumerSession.Dispose(); 
      ConsumerSession = null; 
     } 

     IsConnected = false; 
    } 


    public void Publish(string message) 
    { 
     Producer.Send(ProducerSession.CreateTextMessage(message)); 
    } 


    public void Publish(string[] messages) 
    { 
     foreach (string msg in messages) 
      Publish(msg); 
    } 

    public void ConnectionExceptionHandler(Exception ex) 
    { 
     Disconnect(); // Clean up 

     if (ConnectionError != null) 
      ConnectionError(ex); 
    } 

    #region IDisposable Members 
    private bool disposed; 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    protected virtual void Dispose(bool disposing) 
    { 
     if (!this.disposed) 
     { 
      if (disposing) 
       Disconnect(); 

      disposed = true; 
     } 
    } 
    #endregion 

} 

回答

8

问题列为错误的MSG崩溃就在这里 - >where MQAccess is a small utility class.

问题的第一部分询问如何判断连接处于活动状态。 WebSphere MQ的XMS类是非Java平台的JMS规范的实现。他们非常密切地遵循JMS规范,并且JMS规范没有关于连接或会话等同于isConnected的方法,因此XMS也没有。但是,所有GET和PUT活动都应该发生在try/catch块中,以便捕获JMS异常。 (从中你总是打印linkedException,对不对?)当引发JMS异常时,应用程序将其视为致命并死亡,否则它会关闭除Connection Factory之外的所有JMS对象,等待几秒钟,驱动连接顺序。

根据问题中的新信息更新:
感谢您发布MQAccess类。这提供了对发生的事情的深入了解,但仍然没有任何代码显示连接关闭并重新打开的位置,如问题的第2部分所示。

但是,代码显示MQAccess类创建了一个私人实例ICONNECTION connection作为类实例的构造,然后公开作为MQAccess.GetConnection公开。目前发布的MQAccess类没有公共或私有类方法,它们将永远替换由connection持有的连接句柄,因此如果调用了MQAccess.Connection.Close(),那么MQAccess类中的IConnection对象实例将在持有无效连接句柄后永远保留。一旦连接关闭,那个MQAccess实例就已经死亡。您必须删除并重新实例MQAccess才能获得新的连接。

MQAccess类不公开暴露连接工厂因此理论上将有可能调用从类的外部MQAccess.GetConnection并获得有效的新IConnection对象,即使在关闭原来的后。但是,该实例将存在于MQAccess类的范围之外,因此后续对MQAccess的调用将引用其已停用的实例变量connection,而不是在类外部创建的新连接实例。

如果您需要关闭并重新创建连接,则可以考虑从MQAccess内部管理该连接。一种低技术方法可能是编写连接方法MQAccess.Close(),该方法将关闭现有连接,然后立即呼叫connection = GetConnection();,以便专用connection变量始终保持有效的连接句柄。

如果这样不能解决问题,请张贴正在关闭并重新创建连接的代码。

顺便说一句,通过网络连接的非事务处理会话可能会丢失或重复任何JMS提供程序(包括WMQ)的消息。这是你的意图吗?我已经解释了为什么这是另一个SO帖子here

+0

谢谢T Rob这真的很有帮助。你是对的 - 类不应该在contstructor中启动MQ。 linkedException是什么意思? – Sam

+0

JMS异常(以及因为JMS建模而导致的XMS异常)是一种多级结构。在JMS中,顶层包含“未能连接”等通用异常,但随后的层次包含特定于提供者的数据,如“找不到QMgr”或“错误的QMgr名称”。如果存在链接的异常,它可能具有有助于诊断的信息。请参阅['LinkedException'](http://pic.dhe.ibm.com/infocenter/wmqv7/v7r5/topic/com.ibm.mq.msc.doc/sapiexcpt.html#sapiexcpt_getlkex) –

+0

哦,不介意我看到linkedException是IllegalStateException的一个属性。 – Sam

5

添加评论T.Rob。

问题1:
我希望你可以访问源代码MQAccess。如果是的话,你可以在MQAccess中公开一个属性,指出连接是否处于活动状态。如果您没有访问权限,那么您可能不得不要求该类的作者添加此属性。您可以执行以下操作来设置/重置属性。

1)设置createConnection方法成功返回后的属性。
2)为连接设置一个Exception侦听器。
3)重置异常处理程序中的属性。检查原因代码并重置属性,如果它是连接断开错误(XMSWMQ1107和链接的异常可以具有MQRC 2009)。

问题2
如果你能告诉我们你是如何与closingreopening连接的话,这将有所帮助。我建议关闭连接是:
1)先做一个connection.Stop()。
2)删除任何消息侦听器,基本上做一个consumer.MessageListener = null。
3)然后做connection.Close()。
4)做一个连接=空

附加资料 这里是我已经用于测试样品。

private void OnException(Exception ex) 
    { 
     XMSException xmsex = (XMSException)ex; 
     Console.WriteLine("Got exception"); 
     // Check the error code. 
     if (xmsex.ErrorCode == "XMSWMQ1107") 
     { 
      Console.WriteLine("This is a connection broken error"); 
      stopProcessing = true; // This is a class member variable 
     } 
    } 

在创建连接的方法中,设置异常侦听器。

 // Create connection. 
     connectionWMQ = cf.CreateConnection(); 
     connectionWMQ.ExceptionListener = new ExceptionListener(OnException); 

每当出现连接错误时,将调用异常侦听器并将标志设置为true。

当不再需要对象时,处理对象是一种很好的做法。有父母关系,消费者,生产者等是Session的孩子,这些孩子又是Connection的孩子。因此,处置的顺序可以是儿童第一和父母。但是,如果父母被处置,孩子也会自动处理。

+0

谢谢Shashi!我希望我不要问太多,但是你有没有任何示例代码可以说明你对问题1的回答?谷歌搜索XMSWMQ1107只有这个帖子! – Sam

+0

是否有必要处置生产者,消费者,会话和听众?看到这篇文章:http://stackoverflow.com/questions/12508473/ibm-mq-xms-subscription-not-closing什么是正确的顺序来做到这一点? – Sam

+0

我实现了一个ExceptionListener,但是它不检测连接何时中断,因此我的IsConnected标志没有被设置。 – Sam