2016-11-13 40 views
0

我有一个org.reactivestreams.Processor,我希望在RxJava 2.0中使用它。然而,尽管有转换来将org.reactivestreams.Publisher与RxJava集成,如io.reactivex.Flowable#fromPublisher,但我不清楚如何最好地集成org.reactivestreams.Processor(或org.reactivestreams.Subscriber)。任何人都可以对此发光一些?在RxJava 2.0中使用Reactive-Streams处理器

回答

0

您缠绕Publisher侧并保持Subscriber一侧是:

Processor proc = ... 

Subscriber sub = proc; 
Flowable flow = Flowable.fromPublisher(proc); 

flow.map(v -> v.toString()).subscribe(System.out::println); 

sub.onNext(1); 
+0

嗯,但后来我可能违反反应流合同,指定了'onNext'不能更经常被称为然后通过请求'订阅#请求(长)'。 –

+0

这取决于您获得该处理器的位置,还是它协调下游请求与否。 RxJava的处理器不会协调,如果您向他们发送Subscription,他们总是会请求Long.MAX_VALUE。 – akarnokd

+0

处理器确认[spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification),并且已成功与akka流集成。我认为与RxJava 1.x不同,RxJava 2.0支持背压... –