2016-08-23 45 views
0

我想了解如何取消订阅从活动订阅源创建的Observable。 这里或多或少代码:如何取消订阅实时流

SomeFeed feed = new SomeFeed(); 
Observable<PriceTick> observable = Observable.create(s -> 
    feed.register(new SomeListener() { 
    @Override 
    public void priceTick(PriceTick event) { 
     s.onNext(event); 
    } 

    @Override 
    public void error(Throwable throwable) { 
     s.onError(throwable); 
    } 
    }) 
); 
Subscription subscription = observable.subscribe(System.out::println); 
subscription.unsubscribe(); 
System.out.println("Is unsubscribed:" + subscription.isUnsubscribed()); // prints true 

我发现后认购退订是,用户仍然输出事件流。

如何取消订阅以删除订阅者接收更多通知?

回答

1

您正在创建的可观察项不服从Observable合约。你没有处理退订和退款。 Observable.create可以很容易地创建这样的observables。

相反,您可以使用其他技术来创建您的observables。您可以使用defer运营商或Observable.fromCallable创建观测值。

为了您的代码,你可以使用:

Observable<PriceTick> observable = Observable.create(s -> 
    feed.register(new SomeListener() { 
    @Override 
    public void priceTick(PriceTick event) { 
     if(!s.isUnsbscribed()) { 
     s.onNext(event); 
     } 
    } 

    @Override 
    public void error(Throwable throwable) { 
     if(!s.isUnsbscribed()) { 
     s.onError(throwable); 
     } 
    } 
    }) 
); 

这仍然不处理backpreasure。

更多关于创建观测值here