0

我在玩着实现我自己的观察者或从其他语言移植他们为乐趣和利润。如何在一般情况下测试非RxJava observables或异步代码?

我遇到的问题是,关于如何正确测试observables或异步代码的信息很少。一旦

// Create a stream of values emitted every 100 milliseconds 
// `interval` uses Timer internally 
final Stream<Number> stream = 
    Streams.interval(100).map(number -> number.intValue() * 10); 

ArrayList<Number> expected = new ArrayList<>(); 

expected.add(0); 
expected.add(10); 
expected.add(20); 

IObserver<Number> observer = new IObserver<Number>() { 
    public void next(Number x) { 
    assertEquals(x, expected.get(0)); 
    expected.remove(0); 
    if(expected.size() == 0) { 
     stream.unsubscribe(this); 
    } 
    } 
    public void error(Exception e) {} 
    public void complete() {} 
}; 

stream.subscribe(observer); 

作为流订阅了,它发射的第一个值:

考虑以下测试代码。调用onNext ...然后测试成功退出。

在JavaScript中,现在大多数测试框架都为测试用例提供了可选的Promise,您可以在成功/失败时异步调用该测试用例。有什么类似的Java可用吗?

回答

1

由于执行异常,您必须等到完成。您只需等待一段时间在一个古老的时尚之路

your_code 
wait(1000) 
check results. 

或者,如果您使用的观测量,你可以使用TestSubscriber 在这个例子中,你可以看到有一个异步操作如何,我们等到观察者消耗的所有项目。

@Test 
public void testObservableAsync() throws InterruptedException { 
    Subscription subscription = Observable.from(numbers) 
      .doOnNext(increaseTotalItemsEmitted()) 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(number -> System.out.println("Items emitted:" + total)); 
    System.out.println("I finish before the observable finish. Items emitted:" + total); 


    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

您可以在这里看到更多的异步例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

+0

的问题是,'TestSubscriber'需要一个RxJava兼容'Observer'这可能是我:( – Mamut

+1

检查API中,看到一只承包商TestSubscriber不需要Observable;),所以它基本上就像是等待 – paul

+0

谢谢!有用 :) – Mamut

相关问题