0
我想使用RxJava 1.1.5与Spring WebFlux(即反应堆核心3.1.0.M3),但我无法适应Observable
Flux
。适应RxJava 1.1.5反应堆核心3.1.0.M3
我想这将是相对简单的,但我的适配器不工作:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
我已验证onNext
和onCompleted
都获取调用正确的顺序,但我Flux
总是空空的。有没有人看到我在做什么错了?
在相关说明中,为什么在reactor-addons中没有用于RxJava 1的适配器?
在akarnokd之前不敢称它为“老”:p但是这就是精神:Rx1到发布者的适配器路径非常直截了当,提供直接适配器并不值得麻烦,再加上Reactor想要鼓励使用反应性流 –
无论如何,这没有什么窍门。谢谢你,先生。 –