2017-07-03 213 views
1

我们正在使用Springmessage-driven-channel-adapter订阅MQTT的话题。但我们经常遇到错误。我已经使用JavaScript客户端(mqttws31.js)测试了连接,它工作正常。意味着没有连接问题。Spring MqttPahoMessageDrivenChannelAdapter丢失连接:连接丢失;正在重试

错误: -

org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost 
SEVERE: Lost connection:Connection lost; retrying... 

MQTT消息: -

[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, 
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}] 

配置: -

<bean id="clientFactory" 
    class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> 
    <property name="userName" value="${mqtt.username}" /> 
    <property name="password" value="${mqtt.password}" /> 
</bean> 

<int-mqtt:message-driven-channel-adapter 
    id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}" 
    topics="${topics}" client-factory="clientFactory" auto-startup="true" 
    channel="output" error-channel="errorChannel" /> 


<int:channel id="output" /> 
<int:channel id="errorChannel" /> 

<int:service-activator input-channel="errorChannel" 
    ref="errorMessageLogger" method="logError" /> 
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" /> 

<int:service-activator input-channel="output" 
    method="handleMessage" ref="mqttLogger" /> 
<bean id="mqttLogger" class="com.mqtt.MqttReciever" /> 

的pom.xml:

<dependency> 
    <groupId>org.eclipse.paho</groupId> 
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
    <version>1.1.1</version> 
</dependency> 
<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-mqtt</artifactId> 
    <version>4.2.2.RELEASE</version> 
</dependency> 

在调试org.eclipse.paho.client.mqttv3-1.1.1-sources.jar: -

CommsReceiver.Java

public void run() { 
     final String methodName = "run"; 
     MqttToken token = null; 

     while (running && (in != null)) { 
      try { 
       //@TRACE 852=network read message 
       log.fine(CLASS_NAME,methodName,"852"); 
       receiving = in.available() > 0; 
       MqttWireMessage message = in.readMqttWireMessage(); 
       receiving = false; 

       // instanceof checks if message is null 
       if (message instanceof MqttAck) { 
        token = tokenStore.getToken(message); 
        if (token!=null) { 
         synchronized (token) { 
          // Ensure the notify processing is done under a lock on the token 
          // This ensures that the send processing can complete before the 
          // receive processing starts! (request and ack and ack processing 
          // can occur before request processing is complete if not! 
          clientState.notifyReceivedAck((MqttAck)message); 
         } 
        } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) { 
         //This is an ack for a message we no longer have a ticket for. 
         //This probably means we already received this message and it's being send again 
         //because of timeouts, crashes, disconnects, restarts etc. 
         //It should be safe to ignore these unexpected messages. 
         log.fine(CLASS_NAME, methodName, "857"); 
        } else { 
         // It its an ack and there is no token then something is not right. 
         // An ack should always have a token assoicated with it. 
         throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); 
        } 
       } else { 
        if (message != null) { 
         // A new message has arrived 
         clientState.notifyReceivedMsg(message); 
        } 
       } 
      } 
      catch (MqttException ex) { 
       //@TRACE 856=Stopping, MQttException 
       log.fine(CLASS_NAME,methodName,"856",null,ex); 
       running = false; 
       // Token maybe null but that is handled in shutdown 
       clientComms.shutdownConnection(token, ex); 
      } 
      catch (IOException ioe) { 
       //@TRACE 853=Stopping due to IOException 
       log.fine(CLASS_NAME,methodName,"853"); 

       running = false; 
       // An EOFException could be raised if the broker processes the 
       // DISCONNECT and ends the socket before we complete. As such, 
       // only shutdown the connection if we're not already shutting down. 
       if (!clientComms.isDisconnecting()) { 
        clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); 
       } 
      } 
      finally { 
       receiving = false; 
      } 
     } 

     //@TRACE 854=< 
     log.fine(CLASS_NAME,methodName,"854"); 
    } 

在上述方法中,有时in.readMqttWireMessage()IOException。所以根据catch块重新连接使用clientComms.shutdownConnection(token, ...

+1

你的问题不明确。如果连接丢失,则意味着连接出现问题,可能是由于网络错误。适配器将尝试重新连接。 –

+0

请查找更新后的问题。我已经测试了与JavaScript客户端的连接,它工作正常。 – HybrisFreelance

回答

1

但是你仍然没有真正描述一个问题。你在上面显示一条消息,所以它必须为你工作。 Paho正在检测连接问题;它会通知将重新连接的Spring集成。

通过将ApplicationListener添加到您的应用程序中,您可以获得有关该例外的完整信息。

@Bean 
public ApplicationListener<?> eventListener() { 
    return new ApplicationListener<MqttConnectionFailedEvent>() { 

     @Override 
     public void onApplicationEvent(MqttConnectionFailedEvent event) { 
      event.getCause().printStackTrace(); 
     } 

    }; 
} 

结果:

Connection lost (32109) - java.io.EOFException 
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readByte(DataInputStream.java:267) 
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) 
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116) 
    ... 1 more 

(当我关闭代理)。

如果您认为paho客户端存在问题,那么您应该为该项目提出问题。

+0

'但你还没有真正描述一个问题' - 问题经常是“连接丢失;重试...”,这并不是我期待的。但现在它的工作正常。我已经使用了最新的mqtt(4.3.10.RELEASE)并且做了maven clean/install。现在它按预期工作。没有连接丢失的错误。谢谢你的时间。 – HybrisFreelance