2017-07-29 218 views
0

您好,我正在使用wso2 esb并使用Active MQ进行消息队列。活动MQ连接问题

我有一个简单的服务来放置一个消息,其中它调用自定义java类,它创建一个tcp连接并将消息放入队列中。

Java代码看起来像下面

package in.esb.custommediators; 

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 


import org.apache.synapse.ManagedLifecycle; 
import org.apache.synapse.MessageContext; 
import org.apache.synapse.core.SynapseEnvironment; 
import org.apache.synapse.mediators.AbstractMediator; 

import org.apache.synapse.core.axis2.Axis2MessageContext; 
import org.apache.synapse.transport.nhttp.NhttpConstants; 

import org.json.JSONObject; 
import org.json.XML; 

public class JMSStoreMediator extends AbstractMediator implements 
ManagedLifecycle { 

    Connection connection; 
    Session session; 

    public boolean mediate(MessageContext msgCtx) { 

     log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+ 
       ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+ 
       ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START"); 


     try { 
      boolean topic=false; 
      String jmsuri=""+msgCtx.getProperty("jmsuri"); 
      String t=""+msgCtx.getProperty("topic"); 
      if(t.isEmpty()){ 
       topic=false; 
      } 
      else { 
       topic=Boolean.valueOf(t); 
      } 
      ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri); 
      connection = factory.createConnection(); 
       connection.start(); 

      log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection); 
      this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Destination destination=null; 
      if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue")); 
      else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue")); 
      MessageProducer producer = session.createProducer(destination); 
      producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

      String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume(); 

      if(topic){ 

       JSONObject obj=XML.toJSONObject(xml); 
       JSONObject ar=obj.getJSONObject("soapenv:Body"); 
       ar.remove("xmlns:soapenv"); 
       xml=ar.toString(); 
      } 
      TextMessage message = session.createTextMessage(xml); 
      producer.send(message); 


     } catch (Exception e) { 

      log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage()); 
      e.printStackTrace(); 

      ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500); 
      handleException("Error while storing in the message store", msgCtx); 

     } 
     finally { 
      try { 
       session.close(); 
       if (connection!=null){ 
        log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection); 
        connection.close(); 
       } 

      } catch (JMSException e) { 
       log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString()); 
       e.printStackTrace(); 
      } 
     } 

     return true; 
    } 

    @Override 
    public void destroy() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void init(SynapseEnvironment arg0) { 
     // TODO Auto-generated method stub 

    } 

} 

当我把这种服务,在队列中发送消息下方得到生成的日志。

[2017-07-29 11:18:35,962] INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true} 

截至目前每一件事情是工作良好,但是当两个用户试图在同一轮胎一些奇怪的事情发生提交信息如下图所示

[2017-07-29 11:43:11,948] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false} 
[2017-07-29 11:43:11,963] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true} 

[2017-07-29 11:43:12,068] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed 

主动MQ是创建两个连接,但使用一个连接对于这两个调用以及一个连接在一个服务调用中关闭并且在另一个服务调用中抛出已经关闭的错误,另一个连接在活动状态为true的活动mq的连接列表中永远等待,如下面的图像,这也可以在ESB线程列表中看到。

enter image description here

这种连接堆积,造成挂起ESB服务器。即使我从Active MQ重置此连接ESB线程携带此连接信息,并且只有在重新启动ESB后,问题才会得到解决。

+0

哪里是连接变量初始化?它看起来像是对另一个线程中可用的连接的引用 – simar

+0

这是自定义介体实现吗?你能提供完整的课程代码吗? 这里显然有些多线程问题 – simar

+0

嗨@simar是的,我正在使用一个自定义中介,我已经编辑了我正在使用的完全自定义类的问题,它有一些多线程问题,正如你所说的。 – user4045063

回答

0

你看过文章Extending the Functionality of WSO2 Enterprise Service Bus - Part 1吗?

重要部件是螺纹安全。它指出,每个中介,包括自定义,都是在传入消息之间共享的。我建议类变量

Connection connection; 
Session session; 

移到方法公共布尔中介(MessageContext的msgCtx)因为局部变量是线程安全的

public class JMSStoreMediator extends AbstractMediator implements 
ManagedLifecycle {  

    public boolean mediate(MessageContext msgCtx) { 
      Connection connection; 
      Session session; 
    .... 
    .... 
    rest the same 
+0

根据您的建议进行了更改并进行了检查,但力度也有同样的问题。我将阅读文章。 – user4045063