2015-09-22 37 views
7

假设我有以下RxJava代码(访问一个数据块,但具体使用情况是不相关的):在RxJava中取消订阅线程安全吗?

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) { 
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() { 
     @Override 
     public void call(Subscriber<? super List<DbPlaceDto>> subscriber) { 
      try { 
       Cursor c = getPlacseDb(stringIds); 

       List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>(); 
       while (c.moveToNext()) { 
        dbPlaceDtoList.add(getDbPlaceDto(c)); 
       } 
       c.close(); 

       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(dbPlaceDtoList); 
        subscriber.onCompleted(); 
       } 
      } catch (Exception e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 
     } 
    }); 
} 

鉴于此代码,我有以下问题:

  1. 如果有人取消订阅从此方法返回的观察值(之前的订阅),是否该操作是线程安全的?无论调度如何,我的'isUnsubscribed()'检查是否正确?

  2. 是否有一个更简洁的方式与更少的样板代码来检查未订阅的状态比我在这里使用?我在框架中找不到任何东西。我认为SafeSubscriber解决了用户未订阅时不转发事件的问题,但显然它没有。

回答

8

是,操作线程安全的吗?

是。您正在接收一个rx.Subscriber(最终)检查订阅者订阅未订阅时设置为true的volatile布尔值。

用更少的样板代码清洁器的方法来检查未订阅状态

SyncOnSubscribeAsyncOnSubscribe(作为@Experimental API作为释放1.0.15的)用于该用途的情况下被创建。它们可以作为调用Observable.create的安全替代方案。这是同步案例的一个(人为的)例子。

public static class FooState { 
    public Integer next() { 
     return 1; 
    } 
    public void shutdown() { 

    } 
    public FooState nextState() { 
     return new FooState(); 
    } 
} 
public static void main(String[] args) { 
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
      (state, o) -> { 
       o.onNext(state.next()); 
       return state.nextState(); 
      }, 
      state -> state.shutdown()); 
    Observable<Integer> obs = Observable.create(sos); 
} 

注意,SyncOnSubscribe下一个功能是不允许调用不止一次observer.onNext以上每次迭代也不能同时打电话到该观测。以下是1.x分支头上的​​SyncOnSubscribeimplementationtests的几个链接。它的主要用途是简化编写可观测数据,以同步和onNext方式对数据进行迭代或解析,但在支持背压和检查是否退订的框架中这样做。实质上,您可以创建一个next函数,每次下游运算符需要一个新的数据元素onNexted时,该函数都会被调用。你的下一个函数可以在0或1次的时候调用onNext。

AsyncOnSubscribe设计用于良好地反压异步操作的可观察源(如离箱呼叫)的背压。您的下一个函数的参数包括请求计数,并且您提供的可观察值应该提供一个可观察的数据,以满足所请求数量的数据。这种行为的一个例子是来自外部数据源的分页查询。

此前,将您的OnSubscribe转换为Iterable并使用Observable.from(Iterable)是安全做法。此实现获取迭代器并为您检查subscriber.isUnsubscribed()

+0

谢谢,您实际上回答了我的另一个关于创建具有适当背压支持的“自定义”可观察的问题!我检查了SyncSubscriber,它看起来非常好。在很多情况下,我会将操作转换为Iterable在语义上有点尴尬,但是知道我们可以通过这种方式轻松实现背压支持,这仍然很好。我发现这个班级仍然被标记为@Experimental,你认为它什么时候可以粗略地考虑到生产准备? –

+0

好听!下一步是将其发布到一个版本中(1.0.15版即将推出)。之后,一般情况下,它会进入'@ Beta'状态或直接进入公共状态,因为我们会获得信心。 – Aaron