2017-02-20 29 views
0

在反应式Java中,我们被告知.subscribe()调用返回“一个订阅参考”。但是Subscription是一个接口,不是一个类。那么我们递交的是什么样的对象来实现这个接口?我们是否有任何控制权?什么样的对象是反应式Java订阅?

Subscriptions类可以创建和返回几种不同种类的Subscription,但是用它们做什么?如果我写

Subscription mSub = Subscriptions.create(<some Action0>); 
mSub = someObservable.subscribe(); 

不会我刚刚创建的Subscription只需无论.subscribe()调用返回被改写?你如何使用你创建的Subscription

(在一定程度上相关的说明,什么是Subscriptions.unsubscribed()点,其中“返回订阅到退订什么都不做,因为它已经退订。咦?)

回答

2

简短的回答:你不应该在乎。

再回应:签约为您提供了两种方法:

  • unsubscribe(),这将导致订阅终止。
  • isUnsubscribed(),检查是否已经发生。

您可以使用这些方法a)检查Observable链是否终止,以及b)是否导致它过早终止,例如,如果用户切换到不同的活动。

就是这样。你不会接触的目的。另外,你注意到没有resubscribe方法吗?这是因为如果您想重新启动操作,您需要重新订阅Observable,为您提供新的订阅。

1

正如你所知道Subscription s的使用(例如在Android应用程序中,当您更改Activity(屏幕)时,请刷新旧的ActivityObservable s。在此方案中,Subscription实例由.subscribe()(如您所述)和所以,出于这个原因,我们会直接创建一个Subscription,特别是Subscriptions.unsubscribed() ?我遇到两种情况:

  • 默认实现;避免像Subscription mSub;这样的声明,将填补后者,并可能创建一个NPE。如果您使用需要属性初始化的Kotlin,则尤其如此。

  • 测试

+0

谢谢。问题:在活动重新启动时,以某种方式存储Android Activity订阅是否允许重新连接到Observable?你能给个例子吗?我如何在默认实现中使用创建的订阅?我已经在Android应用程序中声明了一些订阅(对于RxAndroidBle),并且它似乎没有引起问题。 –

+0

保持'Subscription'引用的主要用法是稍后调用'.dispose()'。所以,如果你不处理取消订阅,只需删除'订阅'引用,写'myObservable.subscribe();'而不分配给变量。一个典型的用法是在'onResume()'方法中订阅,存储'Subscription',并在'onPause()'中调用'subscription.dispose();' –

0

在一个有点相关的说明,什么是Subscriptions.unsubscribed(),其中“返回订阅到退订什么都不做,因为它已经退订。咦点?

在1 。x,Subscriptions.unsubscribed()用于返回一个Subscription实例操作已完成(或从未在第一个地方运行),当控制从RxJava返回到您的代码。由于取消订阅是无状态和常量状态,返回的Subscription是一个单例,因为仅通过查看接口Subscription就没有(合理的)方法来区分一个已完成/未订阅的Subscription与另一个。

在2.x中,其等效接口有一个公共和内部版本,Disposable。内部版本主要用于替换已终止的Disposable,避免NullPointerException和空检查,并有助于GC。

与他们做什么?

通常你不需要担心Subscriptions.create();它提供了你有你想要的资源附加到您的最终用户的生命周期的情况下:

FileReader file = new FileReader ("file.txt"); 

readLines(file) 
.map(line -> line.length()) 
.reduce(0, (a, b) -> a + b) 
.subscribe(new Subscriber<Integer>() { 
    { 
     add(Subscriptions.create(() -> { 
      Closeables.closeSilently(file); // utility from Guava 
     }); 
    } 
    @Override public void onNext(Integer) { 
     // process 
    } 
    // onError(), onCompleted() 
}); 

这个例子,证明使用的一种方式,可以通过using,而不是仍然表示:

Observable.using(
    () -> new FileReader("file.txt"), // + try { } catch { } 
    file -> readLines(file).map(...).reduce(...), 
    file -> Closeables.closeSilently(file) 
) 
.subscribe(...)