2017-03-28 95 views
2

我在just()运算符中传递了两个参数。我的代码片段是:在Rxjava中创建线程

Observable<Integer> observable = Observable.just(1,2); 
observable.subscribeOn(Schedulers.newThread()) 
         .subscribe(
           new Observer<Integer>() { 
            @Override 
            public void onSubscribe(Disposable d) { 

            } 

            @Override 
            public void onError(Throwable e) { 

            } 

            @Override 
            public void onComplete() { 

            } 

            @Override 
            public void onNext(Integer e) { 
             System.out.println(e); 
             //request web service 

           }); 

我观察到的是它没有为每个发射物品制作单独的线程。显示为just参数的项目按顺序运行。如何为每个发射物品创建单独的线程?

回答

2

您可以使用flatMap和flatMap内创建新的观察到的,并使用subscribeOn

@Test 
public void test() { 
    Observable.just(1, 2) 
      .flatMap(item -> Observable.just(item) 
        .subscribeOn(Schedulers.newThread()) 
        .doOnNext(i -> System.out.println("Thread:" + Thread.currentThread()))) 
      .subscribe(System.out::println); 
} 

你可以看到异步观察到这里https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

+0

感谢@保罗更多的例子。有效。但为什么我以前的方法不起作用? –

+0

因为你不用observable.subscribeOn创建一个新的observable,这个observable中发出的两个项目都在同一个线程中运行 – paul