2014-01-17 55 views
1

我正在尝试配置Mule连接到外部TCP服务器,然后通过已建立的连接接收流消息(XML文档)。我已经创建了一个从InputStream中抽取XML的自定义协议,但是,TCP Polling Connector似乎正在关闭套接字,然后重新打开套接字,因为我的TCP服务器不断通知我新的连接。下面是我的Mule Flow,任何人都可以建议如何让Mule建立一个TCP连接并保持连接打开,在打开的连接消息后轮询消息?具有持久套接字的Mule TCP轮询监听器?

<?xml version="1.0" encoding="UTF-8"?> 

<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:amqp="http://www.mulesoft.org/schema/mule/amqp" xmlns:tcp="http://www.mulesoft.org/schema/mule/tcp" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/tcp http://www.mulesoft.org/schema/mule/tcp/current/mule-tcp.xsd 
http://www.mulesoft.org/schema/mule/amqp http://www.mulesoft.org/schema/mule/amqp/current/mule-amqp.xsd 
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd"> 
    <tcp:polling-connector name="TCP_Polling" validateConnections="true" sendBufferSize="0" receiveBufferSize="0" receiveBacklog="0" reuseAddress="true" keepAlive="true" clientSoTimeout="10000" keepSendSocketOpen="true" serverSoTimeout="10000" socketSoLinger="0" doc:name="TCP Polling"> 
     <reconnect/> 
     <tcp:custom-protocol class="esb.mule.XMLTCPProtocol"/> 
    </tcp:polling-connector> 
    <amqp:connector name="local_rabbitmq" validateConnections="true" doc:name="AMQP Connector"  > 
     <reconnect frequency="5000" count="3"/> 
    </amqp:connector> 
    <amqp:endpoint exchangeName="xml.exchange" exchangeType="topic" exchangeDurable="true" name="xml_topic_endpoint" responseTimeout="10000" doc:name="AMQP"/> 
    <flow name="XML_TCP_ReaderFlow" doc:name="XML_TCP_ReaderFlow"> 
     <tcp:inbound-endpoint host="127.0.0.1" port="28001" responseTimeout="10000" doc:name="TCP" connector-ref="TCP_Polling"/> 
     <echo-component doc:name="Before Processing"/> 
     <scripting:transformer doc:name="Groovy"> 
      <scripting:script engine="Groovy"><![CDATA[p = message.payload; 
xml = p.substring (p.indexOf("<?xml")); 
rkey = "com.lmco."; 
matcher = (p =~ /<trackNumber>(.*)<\/trackNumber>/); 
if (matcher.getCount() > 0) { 
    rkey += "track.number."+ matcher[0][1]; 
}else{ 
    matcher = (p =~ /<arcid><identifier>([A-Za-z0-9]*)<\/identifier>/); 
    if (matcher.getCount() > 0) { 
    rkey += "flight.arcid."+ matcher[0][1]; 
    }else{ 
    rkey += "unknown"; 
    } 
} 
message.setOutboundProperty ("routing-key", rkey); 
message.payload = xml;]]></scripting:script> 
     </scripting:transformer> 
     <echo-component doc:name="After Processing"/> 
     <amqp:outbound-endpoint  responseTimeout="10000" connector-ref="local_rabbitmq" doc:name="AMQP" ref="xml_topic_endpoint"/> 
    </flow> 
</mule> 

UPDATE 按照要求我启用登录org.mule.transport.tcp和下面的输出结果(去掉了一些输出为简洁起见):

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
+ Initializing app 'darts_tcp_reader'      + 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
INFO 2014-01-21 17:38:34,091 [main] org.mule.lifecycle.AbstractLifecycleManager: Initialising RegistryBroker 
INFO 2014-01-21 17:38:34,226 [main] org.mule.config.spring.MuleApplicationContext: Refreshing [email protected]: startup date [Tue Jan 21 17:38:34 GMT 2014]; root of context hierarchy 
INFO 2014-01-21 17:38:34,311 [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader: Loading XML bean definitions from URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/mule-spring-config.xml] 
INFO 2014-01-21 17:38:34,597 [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader: Loading XML bean definitions from URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml] 
INFO 2014-01-21 17:38:34,919 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean '_muleConfiguration': replacing [Generic bean: class [org.mule.config.spring.MuleConfigurationConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] with [Generic bean: class [org.mule.config.spring.MuleConfigurationConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] 
INFO 2014-01-21 17:38:34,919 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean '_muleConfiguration': replacing [Generic bean: class [org.mule.config.spring.MuleConfigurationConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] with [Generic bean: class [org.mule.config.spring.MuleConfigurationConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] 
INFO 2014-01-21 17:38:34,921 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean '_muleNotificationManager': replacing [Generic bean: class [org.mule.config.spring.ServerNotificationManagerConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] with [Generic bean: class [org.mule.config.spring.ServerNotificationManagerConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] 
INFO 2014-01-21 17:38:34,921 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean '_muleNotificationManager': replacing [Generic bean: class [org.mule.config.spring.ServerNotificationManagerConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] with [Generic bean: class [org.mule.config.spring.ServerNotificationManagerConfigurator]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=null; defined in URL [jar:file:/Applications/MuleStudio/plugins/org.mule.tooling.server.3.4.0.ee_3.4.0.201401021511/mule/mule/mule-module-spring-config-3.4.0.jar!/default-mule-config.xml]] 
INFO 2014-01-21 17:38:34,929 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean '_muleSystemModel': replacing [Root bean: class [org.mule.model.seda.SedaModel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] with [Root bean: class [org.mule.model.seda.SedaModel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] 
INFO 2014-01-21 17:38:34,936 [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader: Loading XML bean definitions from URL [file:/Users/jpbarto/MuleStudio/workspace/.mule/apps/darts_tcp_reader/DARTS TCP Reader.xml] 
INFO 2014-01-21 17:38:35,123 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean 'TCP_Polling': replacing [Root bean: class [org.mule.transport.tcp.PollingTcpConnector]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] with [Root bean: class [org.mule.transport.tcp.PollingTcpConnector]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] 
INFO 2014-01-21 17:38:35,158 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean 'local_rabbitmq': replacing [Root bean: class [org.mule.transport.amqp.AmqpConnector]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] with [Root bean: class [org.mule.transport.amqp.AmqpConnector]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] 
INFO 2014-01-21 17:38:35,166 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean 'dart_topic_endpoint': replacing [Root bean: class [org.mule.endpoint.EndpointURIEndpointBuilder]; scope=prototype; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Root bean: class [org.mule.endpoint.EndpointURIEndpointBuilder]; scope=prototype; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] 
INFO 2014-01-21 17:38:35,171 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Overriding bean definition for bean 'DARTS_TCP_ReaderFlow': replacing [Root bean: class [org.mule.construct.Flow]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] with [Root bean: class [org.mule.construct.Flow]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=initialise; destroyMethodName=dispose] 
INFO 2014-01-21 17:38:35,357 [main] org.springframework.beans.factory.support.DefaultListableBeanFactory: Pre-instantiating singletons in org.s[email protected]1cecd92c: defining beans [_muleCustomEditorConfigurer,_muleObjectNameProcessor,_mulePropertyPlaceholderProcessor,_muleSimpleRegistryBootstrap,_muleNotificationManager,_muleAnnotationsProcessor,_muleTransformerAnnotationProcessor,_muleConfiguration,._muleNotificationManager:notification.1,._muleNotificationManager:notification.2,._muleNotificationManager:notification.3,._muleNotificationManager:notification.4,._muleNotificationManager:notification.5,._muleNotificationManager:notification.6,._muleNotificationManager:notification.7,._muleNotificationManager:notification.8,._muleNotificationManager:notification.9,._muleNotificationManager:notification.10,_muleSystemModel,_defaultInMemoryQueueStore,_defaultPersistentQueueStore,_defaultInMemoryObjectStore,_defaultPersistentObjectStore,_defaultUserObjectStore,_defaultTransientUserObjectStore,_muleQueueManager,_muleObjectStoreManager,_muleSecurityManager,_muleMessageProcessingManager,_muleProperties,_muleEndpointFactory,_muleStreamCloserService,_defaultThreadingProfile,_converterResolver,_defaultMessageDispatcherThreadingProfile,_defaultMessageRequesterThreadingProfile,_defaultMessageReceiverThreadingProfile,_defaultServiceThreadingProfile,_defaultRetryPolicyTemplate,_muleExpressionLanguage,_muleLockFactory,_muleLockProvider,_muleProcessingTimeWatcher,TCP_Polling,.TCP_Polling:reconnect.11,local_rabbitmq,.local_rabbitmq:reconnect.12,dart_topic_endpoint,DARTS_TCP_ReaderFlow,.DARTS_TCP_ReaderFlow:inbound-endpoint.13,.DARTS_TCP_ReaderFlow:echo-component.14,.DARTS_TCP_ReaderFlow:transformer.15,.DARTS_TCP_ReaderFlow:transformer.15:script.16,.DARTS_TCP_ReaderFlow:echo-component.17,ref:dart_topic_endpoint.18]; root of factory hierarchy 
INFO 2014-01-21 17:38:35,931 [main] org.mule.lifecycle.AbstractLifecycleManager: Initialising model: _muleSystemModel 
DEBUG 2014-01-21 17:38:35,991 [main] org.mule.transport.tcp.PollingTcpConnector: Set Connector name to: TCP_Polling 
INFO 2014-01-21 17:38:35,991 [main] org.mule.lifecycle.AbstractLifecycleManager: Initialising connector: TCP_Polling 
DEBUG 2014-01-21 17:38:36,002 [main] org.mule.transport.tcp.PollingTcpConnector: Loading DispatcherFactory for connector: TCP_Polling (org.mule.transport.tcp.PollingTcpConnector) 
DEBUG 2014-01-21 17:38:36,003 [main] org.mule.transport.tcp.PollingTcpConnector: Loading RequesterFactory for connector: TCP_Polling (org.mule.transport.tcp.PollingTcpConnector) 
INFO 2014-01-21 17:38:36,033 [main] org.mule.lifecycle.AbstractLifecycleManager: Initialising connector: local_rabbitmq 
WARN 2014-01-21 17:38:36,086 [main] org.springframework.beans.GenericTypeAwarePropertyDescriptor: Invalid JavaBean property 'port' being accessed! Ambiguous write methods found next to actually used [public void org.mule.endpoint.URIBuilder.setPort(java.lang.String)]: [public void org.mule.endpoint.URIBuilder.setPort(int)] 
INFO 2014-01-21 17:38:36,441 [main] org.mule.construct.FlowConstructLifecycleManager: Initialising flow: DARTS_TCP_ReaderFlow 
INFO 2014-01-21 17:38:36,441 [main] org.mule.exception.DefaultMessagingExceptionStrategy: Initialising exception listener: [email protected] 
INFO 2014-01-21 17:38:36,446 [main] org.mule.component.ComponentLifecycleManager: Initialising component: component.2011100149 
INFO 2014-01-21 17:38:36,447 [main] org.mule.component.ComponentLifecycleManager: Initialising component: component.532231462 
INFO 2014-01-21 17:38:36,457 [main] org.mule.config.builders.AutoConfigurationBuilder: Configured Mule using "org.mule.config.spring.SpringXmlConfigurationBuilder" with configuration resource(s): "[ConfigResource{resourceName='/Users/jpbarto/MuleStudio/workspace/.mule/apps/darts_tcp_reader/DARTS TCP Reader.xml'}]" 
INFO 2014-01-21 17:38:36,457 [main] org.mule.config.builders.AutoConfigurationBuilder: Configured Mule using "org.mule.config.builders.AutoConfigurationBuilder" with configuration resource(s): "[ConfigResource{resourceName='/Users/jpbarto/MuleStudio/workspace/.mule/apps/darts_tcp_reader/DARTS TCP Reader.xml'}]" 
INFO 2014-01-21 17:38:36,457 [main] org.mule.module.launcher.application.DefaultMuleApplication: Monitoring for hot-deployment: /Users/jpbarto/MuleStudio/workspace/.mule/apps/darts_tcp_reader/DARTS TCP Reader.xml 
INFO 2014-01-21 17:38:36,458 [main] org.mule.module.launcher.application.DefaultMuleApplication: 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
+ Starting app 'darts_tcp_reader'       + 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
INFO 2014-01-21 17:38:36,463 [main] org.mule.util.queue.TransactionalQueueManager: Starting ResourceManager 
INFO 2014-01-21 17:38:36,470 [main] org.mule.util.queue.TransactionalQueueManager: Started ResourceManager 
DEBUG 2014-01-21 17:38:36,472 [main] org.mule.transport.tcp.PollingTcpConnector: Connecting: PollingTcpConnector 
{ 
    name=TCP_Polling 
    lifecycle=initialise 
    this=7aa36771 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=false 
    supportedProtocols=[tcp] 
    serviceOverrides= 
    message.receiver=org.mule.transport.tcp.PollingTcpMessageReceiver 
} 

INFO 2014-01-21 17:38:36,473 [main] org.mule.transport.tcp.PollingTcpConnector: Connected: PollingTcpConnector 
{ 
    name=TCP_Polling 
    lifecycle=initialise 
    this=7aa36771 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[tcp] 
    serviceOverrides= 
    message.receiver=org.mule.transport.tcp.PollingTcpMessageReceiver 
} 

INFO 2014-01-21 17:38:36,473 [main] org.mule.transport.tcp.PollingTcpConnector: Starting: PollingTcpConnector 
{ 
    name=TCP_Polling 
    lifecycle=initialise 
    this=7aa36771 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[tcp] 
    serviceOverrides= 
    message.receiver=org.mule.transport.tcp.PollingTcpMessageReceiver 
} 

INFO 2014-01-21 17:38:36,473 [main] org.mule.lifecycle.AbstractLifecycleManager: Starting connector: TCP_Polling 
INFO 2014-01-21 17:38:36,562 [main] org.mule.transport.amqp.AmqpConnector: Connected: AmqpConnector 
{ 
    name=local_rabbitmq 
    lifecycle=initialise 
    this=43886a34 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[amqp] 
    serviceOverrides=<none> 
} 

INFO 2014-01-21 17:38:36,562 [main] org.mule.transport.amqp.AmqpConnector: Starting: AmqpConnector 
{ 
    name=local_rabbitmq 
    lifecycle=initialise 
    this=43886a34 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[amqp] 
    serviceOverrides=<none> 
} 

INFO 2014-01-21 17:38:36,562 [main] org.mule.lifecycle.AbstractLifecycleManager: Starting connector: local_rabbitmq 
INFO 2014-01-21 17:38:36,567 [main] org.mule.lifecycle.AbstractLifecycleManager: Starting model: _muleSystemModel 
INFO 2014-01-21 17:38:36,568 [main] org.mule.construct.FlowConstructLifecycleManager: Starting flow: DARTS_TCP_ReaderFlow 
INFO 2014-01-21 17:38:36,569 [main] org.mule.component.ComponentLifecycleManager: Starting component: component.2011100149 
INFO 2014-01-21 17:38:36,573 [main] org.mule.component.ComponentLifecycleManager: Starting component: component.532231462 
INFO 2014-01-21 17:38:36,575 [main] org.mule.transport.tcp.PollingTcpConnector: Registering listener: DARTS_TCP_ReaderFlow on endpointUri: tcp://127.0.0.1:28001 
INFO 2014-01-21 17:38:36,582 [main] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'null'. Object is: PollingTcpMessageReceiver 
DEBUG 2014-01-21 17:38:36,583 [main] org.mule.transport.tcp.PollingTcpMessageReceiver: Connecting: PollingTcpMessageReceiver{this=4e01c1f2, receiverKey=tcp://127.0.0.1:28001, endpoint=tcp://127.0.0.1:28001} 
INFO 2014-01-21 17:38:36,583 [main] org.mule.transport.tcp.PollingTcpMessageReceiver: Connecting clusterizable message receiver 
DEBUG 2014-01-21 17:38:36,583 [main] org.mule.transport.tcp.PollingTcpMessageReceiver: Connected: tcp://127.0.0.1:28001 
INFO 2014-01-21 17:38:36,583 [main] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: PollingTcpMessageReceiver 
INFO 2014-01-21 17:38:36,583 [main] org.mule.transport.tcp.PollingTcpMessageReceiver: Starting clusterizable message receiver 
DEBUG 2014-01-21 17:38:36,585 [main] org.mule.transport.tcp.PollingTcpMessageReceiver: [email protected] scheduled [email protected] with 1000 MILLISECONDS polling frequency 
INFO 2014-01-21 17:38:36,594 [main] org.mule.module.launcher.application.DefaultMuleApplication: Reload interval: 3000 
INFO 2014-01-21 17:38:36,595 [main] org.mule.module.management.agent.WrapperManagerAgent: This JVM hasn't been launched by the wrapper, the agent will not run. 
INFO 2014-01-21 17:38:36,621 [main] org.mule.module.management.agent.JmxAgent: Attempting to register service with name: Mule.darts_tcp_reader:type=Endpoint,service="DARTS_TCP_ReaderFlow",connector=TCP_Polling,name="endpoint.tcp.127.0.0.1.28001" 
INFO 2014-01-21 17:38:36,621 [main] org.mule.module.management.agent.JmxAgent: Registered Endpoint Service with name: Mule.darts_tcp_reader:type=Endpoint,service="DARTS_TCP_ReaderFlow",connector=TCP_Polling,name="endpoint.tcp.127.0.0.1.28001" 
INFO 2014-01-21 17:38:36,622 [main] org.mule.module.management.agent.JmxAgent: Registered Connector Service with name Mule.darts_tcp_reader:type=Connector,name="TCP.Polling" 
INFO 2014-01-21 17:38:36,622 [main] org.mule.module.management.agent.JmxAgent: Registered Connector Service with name Mule.darts_tcp_reader:type=Connector,name="local.rabbitmq" 
INFO 2014-01-21 17:38:36,628 [main] org.mule.DefaultMuleContext: 
********************************************************************** 
* Application: darts_tcp_reader          * 
* OS encoding: US-ASCII, Mule encoding: UTF-8      * 
*                 * 
* Agents Running:             * 
* Clustering Agent             * 
* JMX Agent              * 
********************************************************************** 
INFO 2014-01-21 17:38:36,629 [main] org.mule.module.launcher.MuleDeploymentService: 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
+ Started app 'darts_tcp_reader'       + 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
DEBUG 2014-01-21 17:38:37,588 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: borrowing socket for /127.0.0.1:28001/2130734434 
DEBUG 2014-01-21 17:38:37,592 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: borrowed socket, open; debt 1 
DEBUG 2014-01-21 17:38:37,595 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: returning socket for 2130734434 
DEBUG 2014-01-21 17:38:37,595 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: returned socket; debt 0 
INFO 2014-01-21 17:38:37,653 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.component.simple.LogComponent: 
******************************************************************************** 
* Message received in service: DARTS_TCP_ReaderFlow. Content is: 'I am   * 
* transmission 0'                * 
******************************************************************************** 
DEBUG 2014-01-21 17:38:37,851 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: returning socket for 2130734434 
DEBUG 2014-01-21 17:38:37,851 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: returned socket; debt -1 
DEBUG 2014-01-21 17:38:38,588 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: borrowing socket for /127.0.0.1:28001/2130734434 
DEBUG 2014-01-21 17:38:38,588 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.transport.tcp.PollingTcpConnector: same as 2130734434? true 
ERROR 2014-01-21 17:38:38,591 [[darts_tcp_reader].TCP_Polling.receiver.01] org.mule.exception.DefaultSystemExceptionStrategy: Caught exception in Exception Strategy: Connection refused 
java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351) 
    at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:213) 
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:432) 
    at java.net.Socket.connect(Socket.java:529) 
    at org.mule.transport.tcp.TcpSocketFactory.createSocket(TcpSocketFactory.java:22) 
    at org.mule.transport.tcp.AbstractTcpSocketFactory.makeObject(AbstractTcpSocketFactory.java:37) 
    at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1179) 
    at org.mule.transport.tcp.TcpConnector.getSocket(TcpConnector.java:201) 
    at org.mule.transport.tcp.PollingTcpMessageReceiver.poll(PollingTcpMessageReceiver.java:70) 
    at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:219) 
    at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:84) 
    at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:53) 
    at org.mule.work.WorkerContext.run(WorkerContext.java:311) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
    at java.lang.Thread.run(Thread.java:680) 
+0

查看日志输出,似乎Socket Pool实际上是每次创建套接字,为什么会这样呢?什么是我看到的PollingTcpConnector提到的“债务”价值,落入负面价值? – user2229657

回答

0

我真的很惊讶,插座不留开:阅读3.4.0的源代码后,很明显,如果keepSendSocketOpen=true在连接器上,插座将返回到池,并保持开放的钝化(参见:https://github.com/mulesoft/mule/blob/mule-3.4.0/transports/tcp/src/main/java/org/mule/transport/tcp/TcpConnector.java#L211https://github.com/mulesoft/mule/blob/mule-3.4.0/transports/tcp/src/main/java/org/mule/transport/tcp/AbstractTcpSocketFactory.java#L68

您可以将org.mule.transport.tcp记录器设置为DEBUG并跟踪发生了什么?

0

在编写我自己的协议(从流中的每条消息中去掉前两个字节,其中第二个是消息长度)以及使用MuleStudio调试执行代码后,我注意到套接字是被“释放”到泳池两次,池第二次确定插座闲置并关闭所述插座。这当然不应该发生。套接字被“释放”两次的原因是因为MessageDispatcher确定套接字不是串流的,接下来的问题就变成了“如何将套接字设置为串流”。事实证明,在我的自定义协议中,当'read'方法传递给InputStream时,我需要将InputStream强制转换为TcpInputStream,然后使用布尔参数'true'将其称为'setStreaming'方法。这一切似乎非常混乱,因为我的协议,没有创建InputStream应该而不是在一般的做法是修改InputStream。我会预期连接器上会有一个XML标志来确定流式传输而不是流式传输,或者在协议本身上设置一些标识为流式传输协议的值。我会假定我做了一些错误的事情,而'读取'方法虽然有效,但并不适合干扰流式传输/不流式传输,但是在查看Mule Streaming协议的源代码时,这是它是如何完成的 - 所以我只能假定这是故意的,并且会要求在未来版本中以更干净,适当或至少明显的方式完成。