2017-01-17 95 views
2

尝试进行升级时遇到问题。将io.projectreactor版本从2.0.x升级到3.0.4 - 使用Spring框架

我目前使用2.0.x版本,特别是 -

reactor.bus 
reactor.rx.Stream 
reactor.rx.Streams 
reactor.core.processor.RingBufferProcessor 
reactor.fn.Consumer 

我使用maven,和我有一个关于 'projectreactor' 单一依赖 -

<groupId>io.projectreactor.spring</groupId> 
<artifactId>reactor-spring-core</artifactId> 

当升级到版本3.0.4.RELEASE,为了继续使用我之前使用过的所有东西,我需要明确导入 -

<groupId>io.projectreactor</groupId> 
<artifactId>reactor-bus</artifactId> 

而且

<groupId>io.projectreactor</groupId> 
<artifactId>reactor-stream</artifactId> 

但我仍然失踪

reactor.core.processor.RingBufferProcessor 
reactor.fn.Consumer 

,我不知道该怎么做。

回答

0
reactor.rx.Stream -> reactor.core.publisher.Flux 
reactor.rx.Streams -> reactor.core.publisher.Flux 
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor 
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor 
reactor.fn.Consumer -> java.until.function.Consumer (Java 8) 

没有新的弹簧模块,因为弹簧5直接包含这些新类型的反应器支持。

至于reactor-bus: 按照设计,现在所有的流路径(Flux/Mono链)都是键入的,所以动态路由并不是我们功能的一部分。还是有一个类型化的方式替代,比如:

ReplayProcessor<MyEvent> rp = ReplayProcessor.create(); 
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev)); 
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev)); 
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev)); 

interest1.subscribe(doSomethingForInterest1); 
interest2.subscribe(doSomethingForInterest2); 
interest1_2.subscribe(doSomethingForInterest1_2); 

rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react 
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react 
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react 
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor 

//shutdown/cleanup/close 
rp.onComplete(); 

我发现这个在github上,这似乎满足您的需求

1

reactor.fn.Consumer被Java 8 java.util.function.Consumer所取代。

至于RingBufferProcessor你必须选择一个new processors所有使用环形缓冲区。

Dispatcher s现在Schedulers使用Java的Executor s在引擎盖下。

+0

什么abount RingBufferDispatcher?我正在使用EventBus,似乎所有的东西都会很快被弃用。让我想到这个库。 –

+0

@AdamBerlin我已经添加了关于调度员的注释。不确定EventBus,但是核心库已经放弃了许多自定义实现转移到标准Java 8类。 –

+0

@AdmBerlin注意到3.x与2.x相比是一个非常大的范式转换,现在更加关注反应式编程和反应流规范。 –