2014-11-02 21 views
16

我试图将一些基于侦听器模式的API包装到Observable中。我的代码大致如下所示。如何在RxJava中的自定义Observable中获得观察者的取消订阅操作的通知

def myObservable = Observable.create({ aSubscriber -> 
    val listener = {event -> 
     aSubscriber.onNext(event);     
    } 
    existingEventSource.addListener(listener) 
}) 

不过,我想我观察到立即从底层existingEventSource删除侦听当观察者调用subscription.unscribe()。我怎么能实现这个目标?

回答

29

Subscriber抽象类实际上有一个方法add,它允许您添加Subscription s将取消订阅订户。

def myObservable = Observable.create({ aSubscriber -> 
    val listener = {event -> 
     aSubscriber.onNext(event);     
    } 
    existingEventSource.addListener(listener) 

    // Adds a lambda to be executed when the Subscriber un-subscribes from your Observable 
    aSubscriber.add(Subscriptions.create(() -> existingEventSource.removeListener(listener))); 
}) 

aSubscriber为订阅您ObservableObserver想;我们将其称为Subscriber。只要Subscriber仍然订阅了ObservableObservable就可以发出值。但是当这个Subscriber取消订阅,那么它应该停止。但是如果我们想要在Subscriber退订时收到通知,我们可以注册Action以便在发生时运行。这就是add方法的用途。正如@dwursteisen在评论中提到的那样;你基本上注册了一个lambda,当订阅者取消订阅时将会执行这个lambda。

也可以在不同的调度程序上取消订阅订阅。请参阅rxanroid项目中的MainThreadSubscription,以获取如何实现该目标的示例。

这里是你如何使用它退订主线程

aSubscriber.add(new MainThreadSubscription() { 
    @Override 
    protected void onUnsubscribe() { 
     existingEventSource.removeListener(listener); 
    } 
}); 
+0

米格尔的一个例子,谢谢您的回答,但我不能得到它。你介意再详细一点,还是指向我可以阅读的一些相关文档或源代码? – xwk 2014-11-02 06:04:38

+0

使用add方法,您注册一个lambda,当Observable退订时将执行该lambda。 – dwursteisen 2014-11-02 13:44:50

+0

@xwk,我加了一点解释,希望能给你一个更好的想法。 – 2014-11-03 12:42:34

相关问题