2016-04-08 23 views
5

我有下面的类(简体):如何测试observable在RxJava中使用正确的调度程序?

public class Usecase<T> { 
    private final Observable<T> get; 
    private final Scheduler observeScheduler; 

    public Usecase(Observable<T> get, Scheduler observeScheduler) { 
     this.get = get; 
     this.observeScheduler = observeScheduler; 
    } 

    public Observable<T> execute() { 
     return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler); 
    } 
} 

而且我编写单元测试它。我如何测试subscribeOnobserveOn是否被调用了正确的值?

我尝试以下方法:

Observable<String> observable = mock(Observable.class); 
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation()); 
    usecase.execute(); 

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes 
    verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here 

上述失败(我认为),因为subscribeOnobserveOnfinal方法。那么可能有其他方法可以确保observable使用正确的调度程序?

回答

3

There is a way可以间接访问观察点正在运行和被观察的两个线程,这意味着您实际上可以验证Observable使用正确的调度程序。

我们仅限于通过名称验证线程。幸运的是,Schedulers.io()使用的线程以一致的前缀命名,我们可以匹配。下面是一个有独特的前缀参考调度的(满了吗?)名单:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputationThreadPool

要验证Observable是订阅IO线程上:

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the 
// thread the Observable is running on (Schedulers.io() in our case) 
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> { 
    s.onNext(Thread.currentThread().getName()); 
    s.onCompleted(); 
}) 

// Schedule the Observable 
Usecase usecase = new Usecase(obs, Schedulers.immediate()); 
Observable usecaseObservable = usecase.execute(); 

// Verify the Observable emitted the name of the IO thread 
String subscribingThread = usecaseObservable.toBlocking().first(); 
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler"); 

要验证可观察到的是计算线程观察到,您可以使用TestSubscriber#getLastSeenThread访问用于观察的最后一个线程。

TestSubscriber<Object> subscriber = TestSubscriber.create(); 
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation()) 
usecase.execute().subscribe(subscriber); 

// The observable runs asynchronously, so wait for it to complete 
subscriber.awaitTerminalEvent(); 
subscriber.assertNoErrors(); 

// Verify the observable was observed on the computation thread 
String observingThread = subscriber.getLastSeenThread().getName(); 
assertThat(observingThread).startsWith("RxComputationThreadPool"); 

没有第三方库或嘲讽是必要的,虽然我使用AssertJ的流畅startsWith断言。

2

Observable上应用运算符会返回一个新的Observable,因此任何后续的运算符应用程序都将发生在不同的对象上。您必须遵循Observable中的组合图来发现应用了哪些运算符以及哪些参数。

这在RxJava中不受支持,您必须依赖内部细节和反思。

一般来说,你不能假设任何关于事件发生的位置,但你可以申请observeOn以确保他们进入你选择的线程。

相关问题