2017-04-15 75 views
0

我试图从“断开连接”部分here复制示例代码。RxJava - ConnectableObservable,断开连接并重新连接

断开

正如我们在连接的签名看到,这个方法返回一个订阅,就像Observable.subscribe一样。您可以使用该引用来终止ConnectableObservable的订阅。这将阻止事件传播给观察者,但它不会从ConnectableObservable中取消订阅。 如果再次调用connect,ConnectableObservable将启动一个新的订阅,旧观察者将再次开始接收值。

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
Subscription s = connectable.connect(); 

connectable.subscribe(i -> System.out.println(i)); 

Thread.sleep(1000); 
System.out.println("Closing connection"); 
s.unsubscribe(); 

Thread.sleep(1000); 
System.out.println("Reconnecting"); 
s = connectable.connect(); 

输出

0 
1 
2 
3 
4 
Closing connection 
Reconnecting 
0 
1 
2 
... 

使用RxJava 2.0.8,我有:

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
    Disposable s = connectable.connect(); 

    connectable.subscribe(new Observer<Long>() { 
     @Override 
     public void onSubscribe(Disposable d) { 

     } 

     @Override 
     public void onNext(Long aLong) { 
      Log.d("test", "Num: " + aLong); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Closing connection"); 
    s.dispose(); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Reconnecting..."); 
    connectable.connect(); 

输出

Num: 0 
Num: 1 
Num: 2 
Num: 3 
Num: 4 
Closing connection 
Reconnecting... 

在此先感谢....

+0

我不会理解您的问题 – Cochi

+0

@Cochi在我的代码中,我的订阅服务器在可连接源断开连接后再次连接后没有收到值。 – veritas1

回答

4

看来这种行为还没有被RxJava采用。工作示例来自Rx.NET。请参见https://github.com/ReactiveX/RxJava/issues/4771

+0

可能是的。我能够使其工作的唯一时间是当我再次调用'connect()'之前'再次订阅()'时。这是一旦'dispose()'被调用后应该如何工作? –

相关问题