2017-03-12 29 views
0

我的观察者过度生产,我需要处理所有元素。当我第一次订阅我想尽快收到元素,但我想缓冲其他人,直到 a)处理第一个元素 AND b)1秒超时。RxJava发出第一个元素并缓冲其他

我目前的实现是:

connectionService.subscribe(request) 
.buffer(1, TimeUnit.SECONDS) 
.flatMap(merger) 
.subscribe(...) 

的 “合并” 做的工作。有2个问题是:

1)的第一个元素将有1秒的延迟太多,但它应该可以立刻

2)如果合并时间超过1秒我收到下一行时元素以前不被处理(注意,合并的第一个元素时,只有一个问题)

+0

这是对的:你有一个可观察的元素。每个元素必须一个接一个地处理。如果一个元素的“处理”花费的时间超过一秒,那么下一个元素可以被处理并被处理?如果它不到一秒钟,你想采取下一个并处理。这是对的吗? –

+0

@HansWurst每个元素都依赖于前一个元素(它们是json补丁字符串http://jsonpatch.com/)。每个元素都必须按顺序处理。第一个元素是初始json,因此需要分别计算,但下一个元素(json拼贴元素)可以合并。 – adam0404

+1

好吧,第一个emmited元素是init。 JSON。以下所有是必须应用于init的补丁。 JSON。看起来,您会使用reduce/scan将每个发出的补丁应用到初始json,最后您将获得应用了所有补丁的json。但它可能是,制作人产生的速度比你应用这些补丁更快。为此,您可以使用rxjava的背压。这是对的吗? –

回答

0

使用这样的(假设RxJava 1):

connectionService.subscribe(request) 
.onBackpressureBuffer(...) 
.flatMap(merger, 1) 
.subscribe(...) 

使用onBackpressureBuffer操作的参数来指定发生了什么当它溢出时。

.flatMap(..., 1)确保从上游请求正好1件物品。

相关问题