2015-10-06 34 views
0

我正在寻找订阅特定主题的MQTT客户端的Java代码,发布在该主题上的每条消息都只应达到客户端一次。我写了许多代码,并且在所有情况下消息在连接到代理时正确传递给客户端,但如果订阅客户端与代理断开连接一段时间,然后再次连接回来,则它不会收到在未连接的时间内发送的消息,以及我已经设置了干净的会话标志还为假,但它仍然没有工作,我用的代码如下维持持久性的正确MQTT订阅代码

import org.fusesource.hawtbuf.*; 
import org.fusesource.mqtt.client.*; 

/** 
* Uses an callback based interface to MQTT. Callback based interfaces 
* are harder to use but are slightly more efficient. 
*/ 
class Listener { 

    public static void main(String []args) throws Exception { 

     String user = env("APOLLO_USER", "admin"); 
     String password = env("APOLLO_PASSWORD", "password"); 
     String host = env("APOLLO_HOST", "localhost"); 
     int port = Integer.parseInt(env("APOLLO_PORT", "61613")); 
     final String destination = arg(args, 1, "subject"); 


     MQTT mqtt = new MQTT(); 
     mqtt.setHost(host, port); 
     mqtt.setUserName(user); 
     mqtt.setPassword(password); 
    mqtt.setCleanSession(false); 
    mqtt.setClientId("newclient"); 

     final CallbackConnection connection = mqtt.callbackConnection(); 
     connection.listener(new org.fusesource.mqtt.client.Listener() { 
      long count = 0; 
      long start = System.currentTimeMillis(); 

      public void onConnected() { 
      } 
      public void onDisconnected() { 
      } 
      public void onFailure(Throwable value) { 
       value.printStackTrace(); 
       System.exit(-2); 
      } 
      public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) { 
       System.out.println("Nisha Messages : " + msg); 
       System.out.println("Nisha topic" + topic); 
       System.out.println("Nisha Receive acknowledgement : " + ack); 
       String body = msg.utf8().toString(); 
       if("SHUTDOWN".equals(body)) { 
        long diff = System.currentTimeMillis() - start; 
        System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0))); 
        connection.disconnect(new Callback<Void>() { 
         @Override 
         public void onSuccess(Void value) { 
          System.exit(0); 
         } 
         @Override 
         public void onFailure(Throwable value) { 
          value.printStackTrace(); 
          System.exit(-2); 
         } 
        }); 
       } else { 
        if(count == 0) { 
         start = System.currentTimeMillis(); 
        } 
        if(count % 1000 == 0) { 
         System.out.println(String.format("Received %d messages.", count)); 
        } 
        count ++; 
       } 
      } 
     }); 
     connection.connect(new Callback<Void>() { 
      @Override 
      public void onSuccess(Void value) { 
       System.out.println("connected in :::: "); 
       Topic[] topics = {new Topic(destination, QoS.AT_MOST_ONCE)}; 
       connection.subscribe(topics, new Callback<byte[]>() { 
        public void onSuccess(byte[] qoses) { 
        } 
        public void onFailure(Throwable value) { 
         value.printStackTrace(); 
         System.exit(-2); 
        } 
       }); 
      } 
      @Override 
      public void onFailure(Throwable value) { 
       value.printStackTrace(); 
       System.exit(-2); 
      } 
     }); 

     // Wait forever.. 
     synchronized (Listener.class) { 
      while(true) 
       Listener.class.wait(); 
     } 
    } 

    private static String env(String key, String defaultValue) { 
     String rc = System.getenv(key); 
     if(rc== null) 
      return defaultValue; 
     return rc; 
    } 

    private static String arg(String []args, int index, String defaultValue) { 
     if(index < args.length) 
      return args[index]; 
     else 
      return defaultValue; 
    } 
} 

给我是不是做错了什么吗?

回答

-1

它不接收期间,它不连接

MQTT 保留所有消息的时间进行发送的消息。如果客户端脱机,未传送的消息将丢失。保留机制仅保留发布到主题的消息最后的消息。

可以read more in the specs3.3.1.3 RETAIN

+1

如果客户端设置为false“干净会话”连接,然后QOS 1/2消息应该得到排队它,而它处于脱机状态。 – knolleary

+0

@knolleary你有没有这方面的参考? AFAIK'清除会话'仅保留对主题的订阅而不保留邮件(请参阅:http://www-01.ibm.com/support/knowledgecenter/SSFKSJ_7.1.0/com.ibm.mq.doc/tt60370_.htm ) –

+1

请参阅标准中的一致性声明MQTT-3.1.2-5:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 – knolleary