2015-12-29 43 views
0

我认为org.apache.synapse.transport.passthru.PassThroughHttpSender是wso2esb 4.8.1中默认的http传输发送者(不确定约4.9.0稍后会检查它)在某些情况下不会将异常借用线程返回到工作池。WSO2ESB 4.8.1 PassThroughHttpSender不会返回异常的线程到工作池

在我看来,这种情况发生在外部序列发生异常时(而不是在直接接受传入请求的代理中)。例如,它在您使用存储和消息处理器实现存储转发模式时发生。

这是我简单的WSO2ESB 4.8.1配置重现该问题:

<?xml version="1.0" encoding="UTF-8"?> 
<definitions xmlns="http://ws.apache.org/ns/synapse"> 
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> 
     <parameter name="cachableDuration">15000</parameter> 
    </registry> 
    <import name="fileconnector" 
      package="org.wso2.carbon.connector" 
      status="enabled"/> 
    <proxy name="ProxyTest" 
      transports="http https" 
      startOnLoad="true" 
      trace="disable"> 
     <target> 
     <inSequence> 
      <property name="FORCE_SC_ACCEPTED" 
         value="true" 
         scope="axis2" 
         type="STRING"/> 
      <log level="custom"> 
       <property name="text" value="store message"/> 
      </log> 
      <store messageStore="TestXMS"/> 
      <log level="custom"> 
       <property name="text" value="message stored"/> 
      </log> 
     </inSequence> 
     <outSequence/> 
     <faultSequence/> 
     </target> 
    </proxy> 
    <localEntry key="ESBInstance">Test<description/> 
    </localEntry> 
    <endpoint name="HTTPEndpoint"> 
     <http method="post" uri-template="http://localhost/index.php"> 
     <timeout> 
      <duration>10</duration> 
      <responseAction>fault</responseAction> 
     </timeout> 
     <suspendOnFailure> 
      <errorCodes>-1</errorCodes> 
      <initialDuration>0</initialDuration> 
      <progressionFactor>1.0</progressionFactor> 
      <maximumDuration>0</maximumDuration> 
     </suspendOnFailure> 
     <markForSuspension> 
      <errorCodes>-1</errorCodes> 
     </markForSuspension> 
     </http> 
    </endpoint> 
    <sequence name="fault"> 
     <log level="full"> 
     <property name="MESSAGE" value="Executing default 'fault' sequence"/> 
     <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/> 
     <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/> 
     </log> 
     <drop/> 
    </sequence> 
    <sequence name="TestSequence"> 
     <log level="full"> 
     <property name="text" value="message recieved"/> 
     </log> 
     <call> 
     <endpoint key="HTTPEndpoint"/> 
     </call> 
     <log level="full"> 
     <property name="text" value="message processed"/> 
     </log> 
    </sequence> 
    <sequence name="main"> 
     <in> 
     <log level="full"/> 
     <filter source="get-property('To')" regex="http://localhost:9000.*"> 
      <send/> 
     </filter> 
     </in> 
     <out> 
     <send/> 
     </out> 
     <description>The main sequence for the message mediation</description> 
    </sequence> 
    <messageStore name="TestXMS"/> 
    <messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" 
        name="TestMP" 
        messageStore="TestXMS"> 
     <parameter name="interval">1000</parameter> 
     <parameter name="sequence">TestSequence</parameter> 
     <parameter name="concurrency">1</parameter> 
     <parameter name="is.active">true</parameter> 
    </messageProcessor> 
</definitions> 

配置端点以便排出工作线程的内部池无处发送请求,并调用TestProxy 20倍以上。 ..

收到第20个请求后,新的请求将被接受并存储在存储中,但工作池将被用尽,并且消息不会从消息存储重试。

我认为必须有可用于PassThroughHttpSender的源代码......但反编译repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar并查看内部原因以更快问题。

我们应该看看sendRequestContent方法中:

synchronized (msgContext) 
     { 
     while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) { 
      try 
      { 
      log.info("msgContext before wait"); 
      msgContext.wait(); 
      log.info("msgContext after wait"); 
      } 
      catch (InterruptedException e) 
      { 
      e.printStackTrace(); 
      } 
     } 

这是哪里出错处理堆栈。当发生异常时(在另一个线程中),它会“忘记”通知当前线程并且msgContext会一直等待(实际上直到服务器重启)。

所以我稍微修改了同一个包中的另一个类 - DeliveryAgent,方法errorConnecting。此方法用于捕获用于连接到目标主机的线程的回调...因此,当我们捕获回调时,我们通知msgContext并通知它在targetErrorHandler之后添加新的同步块以继续...

public void errorConnecting(HttpRoute route, int errorCode, String message) 
    { 
    Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route); 
    if (queue != null) 
    { 
     MessageContext msgCtx = (MessageContext)queue.poll(); 
     if (msgCtx != null) { 
     this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY); 
     synchronized (msgCtx) 
     { 
      log.info("errorConnecting: notify message context about error"); 
      msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE); 
      msgCtx.notifyAll(); 
     } 
     } 
    } 
    else 
    { 
     throw new IllegalStateException("Queue cannot be null for: " + route); 
    } 
    } 

我已经做了一些测试,它看起来像修复了“死”线程的问题。不过,我不知道这是否是一个适当的修复或不...任何建议,欢迎..

链接,反编译和修改后的源文件 - src.zip

回答

0

我想通了这个问题的原因。如果后端应用程序接受请求并且不向ESB回复任何内容,则会发生这种情况。同样,由于超时关闭了连接。在这种情况下,ESB工作线程不会返回到线程池。

为了克服这个问题,是修补org.apache.synapse.transport.passthru.TargetErrorHandler java类

synchronized (mc){ 
    log.info("notify message context about error"); 
    mc.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE); 
    mc.notifyAll(); 
} 

把它catch块前右必要的。

它会正确地将消息上下文设置为有错误,并通知所有等待的对象,反过来又允许ESB返回线程池中的请求处理线程。 4.8.1都是如此。

我已经打补丁并将其放到每月处理数百万个请求的生产环境中(2016年)。自那时以来一直没有问题

相关问题