2012-02-24 28 views
2

我使用org.fusesource.mqtt(MQTT-客户1.0-20120208.162159-18 - 尤伯杯),并写了基于非阻塞例如在Java中的监听器。MQTT客户端在Java中 - 启动我的监听器将线程

我用我的监听器类以下列方式: 监听mqList =新的侦听器( “TCP://本地主机:1883年”, “mytopic /#”, “C:/test.log”,真正的);
new Thread(mqList).start();

这项工作非常完美。 如果我创建了两个实例/线程,那么似乎会出现冲突,并且我会收到连接/断开连接消息。

这里是一个失败的用法:

Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);  
new Thread(mqList).start();         

Listener mqList1 = new Listener("tcp://localhost:1883", "mytopic1/#", "c:/test1.log", true);   
new Thread(mqList1).start(); 

我Listener类是很简单,我百思不得其解,为什么这并不在多线程工作。任何想法/提示?

这里是我的类定义:

import org.fusesource.hawtbuf.Buffer; 
import org.fusesource.hawtbuf.UTF8Buffer; 
import org.fusesource.mqtt.client.*; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.concurrent.CountDownLatch; 
import java.util.logging.*; 
import java.io.*; 
import java.net.URISyntaxException; 

public class Listener implements Runnable{ 
    private static final long DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS = 5000; 
    private static final long DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS = 3600 * 3; 

    private long listenerSleepBeforeReAttemptInSeconds; 
    private long listenerMaxReAttemptDurationInSeconds;  
    private MQTT mqtt; 

    private ArrayList<Topic> topics; 
    private boolean listenerDebug; 
    private String listenerHostURI; 
    private String listenerTopic; 
    private String listenerLogFile; 
    private long listenerLastSuccessfulSubscription; 

    private Logger fLogger; 
    private String NEW_LINE = System.getProperty("line.separator"); 

    public Listener(String listenerHostURI, String listenerTopic, String logFile, boolean debug) { 
     this(listenerHostURI, listenerTopic, logFile, DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS, DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS, debug); 
    } 

    public Listener(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) { 
     init(listenerHostURI, listenerTopic, logFile, listenerSleepBeforeReAttemptInSeconds, listenerMaxReAttemptDurationInSeconds, debug); 
    } 

    private void init(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {   
     this.listenerHostURI = listenerHostURI; 
     this.listenerTopic = listenerTopic; 
     this.listenerLogFile = logFile; 
     this.listenerSleepBeforeReAttemptInSeconds = listenerSleepBeforeReAttemptInSeconds; 
     this.listenerMaxReAttemptDurationInSeconds = listenerMaxReAttemptDurationInSeconds; 
     this.listenerDebug = debug; 
     initMQTT(); 
    } 

    private void initMQTT() { 
     mqtt = new MQTT(); 
     listenerLastSuccessfulSubscription = System.currentTimeMillis(); 

     try { 
      fLogger = Logger.getLogger("eTactica.mqtt.listener"); 
      FileHandler handler = new FileHandler(listenerLogFile); 
      fLogger.addHandler(handler); 
     } catch (IOException e) { 
      System.out.println("Logger - Failed"); 
     }      

     try { 
      mqtt.setHost(listenerHostURI); 
     } catch (URISyntaxException e) { 
      stderr("setHost failed: " + e); 
      stderr(e); 
     }  
     QoS qos = QoS.AT_MOST_ONCE; 
     topics = new ArrayList<Topic>(); 
     topics.add(new Topic(listenerTopic, qos));    
    } 

    private void stdout(String x) { 
     if (listenerDebug) { 
      fLogger.log(Level.INFO, x + NEW_LINE); 
     } 
    } 

    private void stderr(String x) { 
     if (listenerDebug) { 
      fLogger.log(Level.SEVERE, x + NEW_LINE); 
     } 
    } 

    private void stderr(Throwable e) { 
     if (listenerDebug) {    
      StringWriter sw = new StringWriter(); 
      PrintWriter pw = new PrintWriter(sw); 
      e.printStackTrace(pw); 

      fLogger.log(Level.SEVERE, sw.toString() + NEW_LINE); 
     } 
    } 

    private void subscriptionSuccessful() { 
     listenerLastSuccessfulSubscription = System.currentTimeMillis();  
    }  

    private boolean tryToListen() {    
     return ((System.currentTimeMillis() - listenerLastSuccessfulSubscription) < listenerMaxReAttemptDurationInSeconds * 1000); 
    } 

    private void sleepBeforeReAttempt() throws InterruptedException {  
     stdout(String.format(("Listener stopped, re-attempt in %s seconds."), listenerSleepBeforeReAttemptInSeconds)); 
     Thread.sleep(listenerSleepBeforeReAttemptInSeconds); 
    } 

    private void listenerReAttemptsOver() { 
     stdout(String.format(("Listener stopped since reattempts have failed for %s seconds."), listenerMaxReAttemptDurationInSeconds));   
    } 

    private void listen() { 
     final CallbackConnection connection = mqtt.callbackConnection(); 
     final CountDownLatch done = new CountDownLatch(1); 



     /* Runtime.getRuntime().addShutdownHook(new Thread(){ 
      @Override 
      public void run() { 
       setName("MQTT client shutdown"); 
       stderr("Disconnecting the client."); 

       connection.getDispatchQueue().execute(new Runnable() { 
        public void run() { 
         connection.disconnect(new Callback<Void>() { 
          public void onSuccess(Void value) { 
           stdout("Disconnecting onSuccess."); 
           done.countDown(); 
          } 
          public void onFailure(Throwable value) { 
           stderr("Disconnecting onFailure: " + value); 
           stderr(value); 
           done.countDown(); 
          } 
         }); 
        } 
       }); 
      } 
     }); 
     */ 

     connection.listener(new org.fusesource.mqtt.client.Listener() { 

      public void onConnected() { 
       stdout("Listener onConnected");     
      } 

      public void onDisconnected() { 
       stdout("Listener onDisconnected"); 
      } 

      public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) { 
       stdout(topic + " --> " + body.toString());      
       ack.run(); 
      } 

      public void onFailure(Throwable value) { 
       stdout("Listener onFailure: " + value);        
       stderr(value); 
       done.countDown(); 
      } 
     }); 

     connection.resume(); 

     connection.connect(new Callback<Void>() { 
      public void onFailure(Throwable value) { 
       stderr("Connect onFailure...: " + value);       
       stderr(value); 
       done.countDown();     
      } 

      public void onSuccess(Void value) { 
       final Topic[] ta = topics.toArray(new Topic[topics.size()]); 
       connection.subscribe(ta, new Callback<byte[]>() { 
        public void onSuccess(byte[] value) { 
         for (int i = 0; i < value.length; i++) { 
          stdout("Subscribed to Topic: " + ta[i].name() + " with QoS: " + QoS.values()[value[i]]); 
         } 
         subscriptionSuccessful(); 
        } 
        public void onFailure(Throwable value) { 
         stderr("Subscribe failed: " + value);       
         stderr(value); 
         done.countDown(); 
        } 
       }); 
      } 
     }); 

     try { 
      done.await(); 
     } catch (Exception e) { 
      stderr(e); 
     } 
    } 

    @Override 
    public void run() { 
     while (tryToListen()) { 
      initMQTT(); 
      listen(); 
      try { 
       sleepBeforeReAttempt(); 
      } catch (InterruptedException e) { 
       stderr("Sleep failed:" + e); 
       stderr(e); 
      } 
     } 

     listenerReAttemptsOver();  
    } 

} 

回答

0

TCP端口只能有一个监听器。 “tcp:// localhost:1883”中的数字必须对每个侦听器都是唯一的。在某个地方,可能(我不熟悉这个特定的API),你可能也是用一个端口号启动一个客户端;这些数字必须在客户端和服务器之间匹配。

+0

这个应用程序是在客户端和被监听到服务器。多个客户端/线程应该能够以这种方式连接。我能够在同一台工作站上的单独DOS窗口中运行mosquitto_sub。 – Gummi 2012-02-24 15:32:36