0
我正在寻找订阅特定主题的MQTT客户端的Java代码,发布在该主题上的每条消息都只应达到客户端一次。我写了许多代码,并且在所有情况下消息在连接到代理时正确传递给客户端,但如果订阅客户端与代理断开连接一段时间,然后再次连接回来,则它不会收到在未连接的时间内发送的消息,以及我已经设置了干净的会话标志还为假,但它仍然没有工作,我用的代码如下维持持久性的正确MQTT订阅代码
import org.fusesource.hawtbuf.*;
import org.fusesource.mqtt.client.*;
/**
* Uses an callback based interface to MQTT. Callback based interfaces
* are harder to use but are slightly more efficient.
*/
class Listener {
public static void main(String []args) throws Exception {
String user = env("APOLLO_USER", "admin");
String password = env("APOLLO_PASSWORD", "password");
String host = env("APOLLO_HOST", "localhost");
int port = Integer.parseInt(env("APOLLO_PORT", "61613"));
final String destination = arg(args, 1, "subject");
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);
mqtt.setClientId("newclient");
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new org.fusesource.mqtt.client.Listener() {
long count = 0;
long start = System.currentTimeMillis();
public void onConnected() {
}
public void onDisconnected() {
}
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
System.out.println("Nisha Messages : " + msg);
System.out.println("Nisha topic" + topic);
System.out.println("Nisha Receive acknowledgement : " + ack);
String body = msg.utf8().toString();
if("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
System.exit(0);
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
} else {
if(count == 0) {
start = System.currentTimeMillis();
}
if(count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count ++;
}
}
});
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
System.out.println("connected in :::: ");
Topic[] topics = {new Topic(destination, QoS.AT_MOST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
}
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
System.exit(-2);
}
});
// Wait forever..
synchronized (Listener.class) {
while(true)
Listener.class.wait();
}
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if(rc== null)
return defaultValue;
return rc;
}
private static String arg(String []args, int index, String defaultValue) {
if(index < args.length)
return args[index];
else
return defaultValue;
}
}
给我是不是做错了什么吗?
如果客户端设置为false“干净会话”连接,然后QOS 1/2消息应该得到排队它,而它处于脱机状态。 – knolleary
@knolleary你有没有这方面的参考? AFAIK'清除会话'仅保留对主题的订阅而不保留邮件(请参阅:http://www-01.ibm.com/support/knowledgecenter/SSFKSJ_7.1.0/com.ibm.mq.doc/tt60370_.htm ) –
请参阅标准中的一致性声明MQTT-3.1.2-5:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 – knolleary