2015-10-27 61 views
1

我编写了一个服务来订阅来自mqtt服务器的消息。当互联网连接开/关时,服务自动重新连接。MQTT Android客户端调用CalBackListener问题

此代码:

public class NotificationService extends Service{ 

    private Handler connectBrokerHandler; 
    private ConnectBrokerTimerTask connectBrokerTimerTask; 
    private Timer connectBrokerTimer; 

    private MqttAndroidClient androidClient; 
    private MqttConnectOptions connectOptions; 
    private MemoryPersistence memPer; 
    private IMqttToken connectToken; 
    private CustomActionListener customActionListener; 
    private Context context; 
    private String CLIENT_STATUS = CLIENT_INIT; 

    private static final String TAG = "NotificationService"; 
    private static final String USER_NAME = "chuxuanhy"; 
    private static final String PASS_WORD = "0936160721"; 
    private static final String URL_BROKER = "tcp://ntm.diyoracle.com:1883"; 
    private static final String TOPIC_STAFF = "PROSHIP_STAFF"; 
    private static final String DEVICE_ID = "FUCKYOU_002"; 
    private static final int CONNECTION_TIMEOUT = 60; 
    private static final int KEEP_ALIVE_INTERVAL = 60*60; 
    private static final int TRY_CONNECTION_DELAY = 30; //seconds 

    private static final String CLIENT_INIT = "CLIENT_INIT"; 
    private static final String CLIENT_CONNECTION_BROKER_START = "CLIENT_CONNECTION_BROKER_START"; 
    private static final String CLIENT_CONNECTION_BROKER_FAIL = "CLIENT_CONNECTION_BROKER_FAIL"; 
    private static final String CLIENT_CONNECTION_BROKER_SUCCESS = "CLIENT_CONNECTION_BROKER_SUCCESS"; 
    private static final String CLIENT_LOST_CONNECTION = "CLIENT_LOST_CONNECTION"; 
    private static final String CLIENT_SUBSCRIBE_TOPIC_START = "CLIENT_SUBSCRIBE_TOPIC_START"; 
    private static final String CLIENT_SUBSCRIBE_TOPIC_FAIL = "CLIENT_SUBSCRIBE_TOPIC_FAIL"; 
    private static final String CLIENT_SUBSCRIBE_TOPIC_SUCCESS = "CLIENT_SUBSCRIBE_TOPIC_SUCCESS"; 

    private IntentFilter FilterNetwork; 
    private BroadcastReceiver ReceiverCheckInternet = new BroadcastReceiver() { 
     @Override 
     public void onReceive(Context context, Intent intent) { 
      if(isOnline()){ 
       if(CLIENT_STATUS == CLIENT_LOST_CONNECTION || CLIENT_STATUS == CLIENT_CONNECTION_BROKER_FAIL || CLIENT_STATUS == CLIENT_SUBSCRIBE_TOPIC_FAIL) { 
        ConnectBroker(); 
       } 
      } 
     } 
    }; 

    private class ConnectBrokerTimerTask extends TimerTask{ 
     @Override 
     public void run() { 
      connectBrokerHandler.post(ConnectBrokerRunable); 
     } 
    } 

    private Runnable ConnectBrokerRunable = new Runnable() { 
     @Override 
     public void run() { 
      ConnectBroker(); 
     } 
    }; 

    public boolean isOnline() { 
     ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); 
     NetworkInfo netInfo = cm.getActiveNetworkInfo(); 
     return netInfo != null && netInfo.isConnectedOrConnecting(); 
    } 

    public void RestartNotificationService(){ 
     Intent restartServiceIntent = new Intent(getApplicationContext(), this.getClass()); 
     restartServiceIntent.setPackage(getPackageName()); 

     PendingIntent restartServicePendingIntent = PendingIntent.getService(getApplicationContext(), 1, restartServiceIntent, PendingIntent.FLAG_ONE_SHOT); 
     AlarmManager alarmService = (AlarmManager) getApplicationContext().getSystemService(Context.ALARM_SERVICE); 
     alarmService.set(AlarmManager.ELAPSED_REALTIME, SystemClock.elapsedRealtime() + 1000, restartServicePendingIntent); 
    } 

    public void createNotification(String title, String body) { 
     Intent intent = new Intent(this, NotificationReceiver.class); 
     PendingIntent pIntent = PendingIntent.getActivity(this, (int) System.currentTimeMillis(), intent, 0); 

     NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(getApplicationContext()); 
     Notification notification = mBuilder.setSmallIcon(R.drawable.train).setTicker(title).setWhen(0) 
       .setAutoCancel(true) 
       .setContentTitle(title) 
       .setStyle(new NotificationCompat.BigTextStyle().bigText(body)) 
       .setContentIntent(pIntent) 
       .setSound(RingtoneManager.getDefaultUri(RingtoneManager.TYPE_NOTIFICATION)) 
       .setVibrate(new long[]{1000, 1000}) 
       .setLargeIcon(BitmapFactory.decodeResource(getApplicationContext().getResources(), R.mipmap.ic_launcher)) 
       .setContentText(body).build(); 

     Random random = new Random(); 
     int m = random.nextInt(9999 - 1000) + 1000; 

     NotificationManager notificationManager = (NotificationManager) getApplicationContext().getSystemService(Context.NOTIFICATION_SERVICE); 
     notificationManager.notify(m, notification); 
    } 

    @Nullable 
    @Override 
    public IBinder onBind(Intent intent) { 
     return null; 
    } 

    @Override 
    public void onTaskRemoved(Intent rootIntent) { 
     try{ 
      RestartNotificationService(); 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 
     super.onTaskRemoved(rootIntent); 
    } 

    @Override 
    public void onDestroy() { 
     super.onDestroy(); 
     unregisterReceiver(ReceiverCheckInternet); 
     Log.e(TAG, "onDestroy"); 
     try{ 
      if(androidClient.isConnected()){ 
       this.androidClient.unsubscribe(TOPIC_STAFF); 
       this.androidClient.disconnect(); 
       this.androidClient = null; 
      } 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 
    } 

    private void ConnectBroker(){ 
     try { 
      CLIENT_STATUS = CLIENT_CONNECTION_BROKER_START; 
      connectToken = this.androidClient.connect(connectOptions,null,customActionListener); 
     }catch (Exception e){ 
      //e.printStackTrace(); 
      Log.e(TAG, "ConnectBroker Fail Exception"); 
     } 
    } 

    private void SubscribeBrokerTopic(String topic){ 
     try { 
      CLIENT_STATUS = CLIENT_SUBSCRIBE_TOPIC_START; 
      androidClient.subscribe(topic, 0, null, customActionListener); 
     } catch (Exception e) { 
      //e.printStackTrace(); 
      Log.e(TAG, "SubscribeBrokerTopic Fail Exception"); 
     } 
    } 

    @Override 
    public int onStartCommand(Intent intent, int flags, int startId) { 
     Log.e("NOTIFI","CALL ME onStartCommand"); 
     this.connectBrokerHandler = new Handler(); 
     this.connectBrokerTimer = new Timer(); 
     this.connectBrokerTimerTask = new ConnectBrokerTimerTask(); 

     this.customActionListener = new CustomActionListener(); 

     this.memPer = new MemoryPersistence(); 
     this.connectOptions = new MqttConnectOptions(); 
     this.connectOptions.setCleanSession(true); 
     this.connectOptions.setUserName(USER_NAME); 
     this.connectOptions.setPassword(PASS_WORD.toCharArray()); 
     this.connectOptions.setConnectionTimeout(CONNECTION_TIMEOUT); 
     this.connectOptions.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); 

     this.androidClient = new MqttAndroidClient(getApplicationContext(),URL_BROKER,DEVICE_ID,memPer); 
     this.androidClient.setCallback(new CustomCallBack()); 
     this.androidClient.setTraceCallback(new CustomTraceHandler()); 

     //this.androidClient. 

     this.FilterNetwork = new IntentFilter(); 
     this.FilterNetwork.addAction("android.net.conn.CONNECTIVITY_CHANGE"); 
     registerReceiver(ReceiverCheckInternet, FilterNetwork); 

     //this.androidClient.registerResources(getApplicationContext()); 
     ConnectBroker(); 

     return START_NOT_STICKY; 
    } 

    private class CustomTraceHandler implements MqttTraceHandler{ 
     @Override 
     public void traceDebug(String source, String message) { 
      Log.e(TAG,"CustomConnectHandler - traceDebug\n"+source+" - "+message); 
     } 

     @Override 
     public void traceError(String source, String message) { 
      Log.e(TAG, "CustomConnectHandler - traceError\n" + source + " - " + message); 
     } 

     @Override 
     public void traceException(String source, String message, Exception e) { 
      Log.e(TAG, "CustomConnectHandler - traceException\n" + source + " - " + message); 
      e.printStackTrace(); 
     } 
    } 

    private class CustomCallBack implements MqttCallback{ 
     @Override 
     public void connectionLost(Throwable throwable) { 
      CLIENT_STATUS = CLIENT_LOST_CONNECTION; 

      if(!isOnline()){ 
       Log.e(TAG, "ConnectCallBack connectionLost - No Internet Connection "); 
      }else { 
       //throwable.printStackTrace(); 
      } 

     } 
     @Override 
     public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
      Log.e(TAG,"ConnectCallBack messageArrived\n"+s+" - "+mqttMessage.toString()); 
      createNotification(s, mqttMessage.toString()); 
     } 

     @Override 
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
      Log.e(TAG, "ConnectCallBack deliveryComplete"); 
     } 
    } 

    private class CustomActionListener implements IMqttActionListener { 

     @Override 
     public void onSuccess(IMqttToken iMqttToken) { 
      if (CLIENT_STATUS == CLIENT_CONNECTION_BROKER_START) { 
       CLIENT_STATUS = CLIENT_CONNECTION_BROKER_SUCCESS; 
       if (iMqttToken.getTopics() == null) { 
        SubscribeBrokerTopic(TOPIC_STAFF); 
       } 
       Log.e(TAG, "CustomActionListener - CONNECT BROKER onSuccess"); 
      } else if (CLIENT_STATUS == CLIENT_SUBSCRIBE_TOPIC_START) { 
       CLIENT_STATUS = CLIENT_SUBSCRIBE_TOPIC_SUCCESS; 
       Log.e(TAG, "CustomActionListener - SUBSCRIBE TOPIC onSuccess"); 
      } 
     } 

     @Override 
     public void onFailure(IMqttToken iMqttToken, Throwable throwable) { 
      if (CLIENT_STATUS == CLIENT_CONNECTION_BROKER_START) { 
       CLIENT_STATUS = CLIENT_CONNECTION_BROKER_FAIL; 
       Log.e(TAG, "CustomActionListener - CONNECT BROKER onFailure "); 
      } else if (CLIENT_STATUS == CLIENT_SUBSCRIBE_TOPIC_START) { 
       CLIENT_STATUS = CLIENT_SUBSCRIBE_TOPIC_FAIL; 
       Log.e(TAG, "CustomActionListener - SUBSCRIBE TOPIC onFailure "); 
      } 
      if (isOnline()) { 
       throwable.printStackTrace(); 
       //connectBrokerTimer.scheduleAtFixedRate(connectBrokerTimerTask, 5000, TRY_CONNECTION_DELAY * 1000); 
      }else { 
       Log.e(TAG, "CustomActionListener onFailure - No Internet Connection "); 
      } 

     } 
    } 
} 

但我有一个问题,每一次互联网连接丢失,我又重新连接服务器,回拨电话。 例子:

Log.e(TAG, "ConnectCallBack connectionLost - No Internet Connection "); 

通话两次,3次,4次每次断开互联网 对不起我的英语也不好,感谢帮助。

回答

0

不要手动处理重新连接,新版本的paho可以选择在MqttConnectOptions类中自动重新连接。

compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1' 
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'