2016-11-02 42 views
0

我有时必须在Observable中执行一些清理任务(例如关闭打开的文件),我想知道什么是最好的方法。 到目前为止,我已经看到了两个,但我很难理解它们有什么不同:你能解释一下这些差异吗?是否有更好的方法来实现这一点?RxJava Observable.doOnUnsubscribe()vs Subscriber.add()

1)

// MyObject will take care of calling onNext(), onError() and onCompleted() 
    // on the subscriber. 
    final MyObject o = new MyObject(); 

    Observable obs = Observable.create(new Observable.OnSubscribe<Object>() { 
     @Override 
     public void call(Subscriber<? super Object> subscriber) { 
      try { 
       if (!subscriber.isUnsubscribed()) { 

        o.setSubscriber(subscriber); 

        // This will tell MyObject to start allocating resources and do its job. 
        o.start(); 

       } 
      } catch (Exception e) { 
       subscriber.onError(e); 
      } 
     } 
    }).doOnUnsubscribe(new Action0() { 
     @Override 
     public void call() { 
      // This will tell MyObject to finish its job and deallocate any resources. 
      o.stop(); 
     } 
    }); 

2)

Observable obs = Observable.create(new Observable.OnSubscribe<Object>() { 
     @Override 
     public void call(Subscriber<? super Object> subscriber) { 
      try { 
       if (!subscriber.isUnsubscribed()) { 

        // MyObject will take care of calling onNext(), onError() and onCompleted() 
        // on the subscriber. 
        final MyObject o = new MyObject(subscriber); 

        subscriber.add(Subscriptions.create(new Action0() { 
         @Override 
         public void call() { 
          // This will tell MyObject to finish its job and deallocate any resources. 
          o.stop(); 
         } 
        })); 

        // This will tell MyObject to start allocating resources and do its job. 
        o.start(); 

       } 
      } catch (Exception e) { 
       subscriber.onError(e); 
      } 
     } 
    }); 
+1

您已经使用'doOnSubscribe资源'在示例1中。它应该是问题标题中提到的'doOnUnsubscribe'。 –

+0

@PraveerGupta谢谢我修复了错字! –

回答

1

要回答你的原问题,doOnUnSubscribe和加到Subscriber是相同的。实际上,当您致电doOnUnSubscribe时,它只会将您的Action作为Subscription添加到您的Subscriber。所以,doOnUnSubscribe在后台使用你的第二个例子。

doOnUnSubscribe代码:

public class OperatorDoOnUnsubscribe<T> implements Operator<T, T> { 
    private final Action0 unsubscribe; 

/** 
* Constructs an instance of the operator with the callback that gets invoked when the modified Observable is unsubscribed 
* @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is unsubscribed 
*/ 
public OperatorDoOnUnsubscribe(Action0 unsubscribe) { 
    this.unsubscribe = unsubscribe; 
} 

@Override 
public Subscriber<? super T> call(final Subscriber<? super T> child) { 
    child.add(Subscriptions.create(unsubscribe)); 

    // Pass through since this operator is for notification only, there is 
    // no change to the stream whatsoever. 
    return Subscribers.wrap(child); 
    } 
} 
0

什么是使用Observable.create如果你不把与subscriber.onNext下游的任何值的点?

第一个问题是一个巨大的禁忌,因为您正在对已关闭的对象执行副作用。如果您同时从两个不同的线程订阅创建的observable,会发生什么?

第二个看起来更好,因为你添加了subscriber.add,如果订阅已经处理,它将调用o.stop()。唯一缺少的是onNext的值,即值向下游传递。

有一个用于从资源创建Observables的操作符,称为“using”。请看看http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)

+0

关于你的第一句话:MyObject将在使用setSubscriber()启动start()后立即调用onNext/onCompleted/onError。 关于你的第二句话:我不明白,你能详细阐述一下吗? –

+0

您正在使用哪个版本的RxJava?我想重构你的例子。 MyObject的类型是什么? –

+0

使用rxJava 1.2.0 –

0

首先是不使用Observable.create(OnSubscribe)如果你能帮助它,因为你可以轻松突破的东西(如背压或可观察的合同有关的事情)。您应该使用许多静态工厂方法之一。

除了直接解决你的问题,我建议Observable.using这是明确设计释放终止或取消订阅资源。

例如:

Observable<byte[]> bytes = 
    Observable.using(
    () -> new FileInputStream(file), //resource factory 
    is -> Bytes.from(is), // observable factory 
    is -> is.close() // close action 
);  

上面的例子错过一些尝试副渔获物(例如大约is.close())如果你使用RxJava 2.

试图以充实你的情况将不会出现:

Observable.using(
() -> new MyObject(), //resource factory 
    myObject -> makeObservable(myObject), // observable factory 
    myObject -> myObject.stop() // close action 
);  
+0

我试过你的方法,虽然它不适用于我,因为调用onError/onNext/onCompleted的逻辑是在MyObject(在你的情况下由FileInputStream表示)的内部。 我想这次我无法逃避'Observable.create(OnSubscribe)'。 –

+0

您的可观察创作可以独立于关闭方面。我在答案中添加了更多细节。最终,您应该探讨在另一个StackOverflow问题中可能不使用'Observable.create'。 –

1

决定这两个解决方案,前提是你已经提到,使用取决于ü请确认您尝试使用/关闭/处置的资源是否意图在多个订阅之间共享。

  1. 使用subscriber.add(...)当资源用于生成事件。在这种情况下,你不会想分享资源。

    • 这就是你的例子MyObject的情况。这有利于资源不会暴露在Observable.create()方法之外,从而使资源免受意外的副作用。
  2. 使用doOnUnsubscribe当你要共享多个订阅东西。

    • 举个例子,如果你想有多少次使用的可观察到一个柜台,你可以有一个共同的计数器,并继续在doOnUnsubscribedoOnSubscribe递增。
    • 另一个例子是,如果您想要计算当前有多少个连接打开资源,则可以使用doOnSubscribedoOnUnsubscribe中的递增和递减组合来实现该功能。
在你的榜样

此外,而不是创建MyObject抽象,它是管理开放和资源,产生事件的最后,你可以用Observable.using()方法来达到同样的更换。这需要在三个参数:

  • resourceFactory,这将打开资源,
  • observableFactory,这将产生事件和
  • disposeAction,这将关闭