我认为org.apache.synapse.transport.passthru.PassThroughHttpSender是wso2esb 4.8.1中默认的http传输发送者(不确定约4.9.0稍后会检查它)在某些情况下不会将异常借用线程返回到工作池。WSO2ESB 4.8.1 PassThroughHttpSender不会返回异常的线程到工作池
在我看来,这种情况发生在外部序列发生异常时(而不是在直接接受传入请求的代理中)。例如,它在您使用存储和消息处理器实现存储转发模式时发生。
这是我简单的WSO2ESB 4.8.1配置重现该问题:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<import name="fileconnector"
package="org.wso2.carbon.connector"
status="enabled"/>
<proxy name="ProxyTest"
transports="http https"
startOnLoad="true"
trace="disable">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED"
value="true"
scope="axis2"
type="STRING"/>
<log level="custom">
<property name="text" value="store message"/>
</log>
<store messageStore="TestXMS"/>
<log level="custom">
<property name="text" value="message stored"/>
</log>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
<localEntry key="ESBInstance">Test<description/>
</localEntry>
<endpoint name="HTTPEndpoint">
<http method="post" uri-template="http://localhost/index.php">
<timeout>
<duration>10</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
</markForSuspension>
</http>
</endpoint>
<sequence name="fault">
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<drop/>
</sequence>
<sequence name="TestSequence">
<log level="full">
<property name="text" value="message recieved"/>
</log>
<call>
<endpoint key="HTTPEndpoint"/>
</call>
<log level="full">
<property name="text" value="message processed"/>
</log>
</sequence>
<sequence name="main">
<in>
<log level="full"/>
<filter source="get-property('To')" regex="http://localhost:9000.*">
<send/>
</filter>
</in>
<out>
<send/>
</out>
<description>The main sequence for the message mediation</description>
</sequence>
<messageStore name="TestXMS"/>
<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
name="TestMP"
messageStore="TestXMS">
<parameter name="interval">1000</parameter>
<parameter name="sequence">TestSequence</parameter>
<parameter name="concurrency">1</parameter>
<parameter name="is.active">true</parameter>
</messageProcessor>
</definitions>
配置端点以便排出工作线程的内部池无处发送请求,并调用TestProxy 20倍以上。 ..
收到第20个请求后,新的请求将被接受并存储在存储中,但工作池将被用尽,并且消息不会从消息存储重试。
我认为必须有可用于PassThroughHttpSender的源代码......但反编译repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar并查看内部原因以更快问题。
我们应该看看sendRequestContent方法中:
synchronized (msgContext)
{
while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
try
{
log.info("msgContext before wait");
msgContext.wait();
log.info("msgContext after wait");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
这是哪里出错处理堆栈。当发生异常时(在另一个线程中),它会“忘记”通知当前线程并且msgContext会一直等待(实际上直到服务器重启)。
所以我稍微修改了同一个包中的另一个类 - DeliveryAgent,方法errorConnecting。此方法用于捕获用于连接到目标主机的线程的回调...因此,当我们捕获回调时,我们通知msgContext并通知它在targetErrorHandler之后添加新的同步块以继续...
public void errorConnecting(HttpRoute route, int errorCode, String message)
{
Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
if (queue != null)
{
MessageContext msgCtx = (MessageContext)queue.poll();
if (msgCtx != null) {
this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY);
synchronized (msgCtx)
{
log.info("errorConnecting: notify message context about error");
msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
msgCtx.notifyAll();
}
}
}
else
{
throw new IllegalStateException("Queue cannot be null for: " + route);
}
}
我已经做了一些测试,它看起来像修复了“死”线程的问题。不过,我不知道这是否是一个适当的修复或不...任何建议,欢迎..
链接,反编译和修改后的源文件 - src.zip