2017-07-30 21 views
0

我想使用RxJava 1.1.5与Spring WebFlux(即反应堆核心3.1.0.M3),但我无法适应ObservableFlux适应RxJava 1.1.5反应堆核心3.1.0.M3

我想这将是相对简单的,但我的适配器不工作:

import reactor.core.publisher.Flux; 
import rx.Observable; 
import rx.Subscriber; 
import rx.Subscription; 

public static <T> Flux<T> toFlux(Observable<T> observable) { 
    return Flux.create(emitter -> { 
     final Subscription subscription = observable.subscribe(new Subscriber<T>() { 
      @Override 
      public void onNext(T value) { 
       emitter.next(value); 
      } 
      @Override 
      public void onCompleted() { 
       emitter.complete(); 
      } 
      @Override 
      public void onError(Throwable throwable) { 
       emitter.error(throwable); 
      } 
     }); 
     emitter.onDispose(subscription::unsubscribe); 
    }); 
} 

我已验证onNextonCompleted都获取调用正确的顺序,但我Flux总是空空的。有没有人看到我在做什么错了?

在相关说明中,为什么在reactor-addons中没有用于RxJava 1的适配器?

回答

3

使用RxJavaReactiveStreams适配器将您的Observable变成Publisher,然后Flux.fromPublisher()消耗它。

compile 'io.reactivex:rxjava-reactive-streams:1.2.1' 

Observable<T> o = ... 

Flux.from(RxReactiveStreams.toPublisher(o)); 

在一个相关的说明,为什么没有在反应堆插件RxJava 1适配器?

他们不想支持或鼓励使用这项旧技术,我完全同意。

+0

在akarnokd之前不敢称它为“老”:p但是这就是精神:Rx1到发布者的适配器路径非常直截了当,提供直接适配器并不值得麻烦,再加上Reactor想要鼓励使用反应性流 –

+0

无论如何,这没有什么窍门。谢谢你,先生。 –