0
我的观察者过度生产,我需要处理所有元素。当我第一次订阅我想尽快收到元素,但我想缓冲其他人,直到 a)处理第一个元素 AND b)1秒超时。RxJava发出第一个元素并缓冲其他
我目前的实现是:
connectionService.subscribe(request)
.buffer(1, TimeUnit.SECONDS)
.flatMap(merger)
.subscribe(...)
的 “合并” 做的工作。有2个问题是:
1)的第一个元素将有1秒的延迟太多,但它应该可以立刻
2)如果合并时间超过1秒我收到下一行时元素以前不被处理(注意,合并的第一个元素时,只有一个问题)
这是对的:你有一个可观察的元素。每个元素必须一个接一个地处理。如果一个元素的“处理”花费的时间超过一秒,那么下一个元素可以被处理并被处理?如果它不到一秒钟,你想采取下一个并处理。这是对的吗? –
@HansWurst每个元素都依赖于前一个元素(它们是json补丁字符串http://jsonpatch.com/)。每个元素都必须按顺序处理。第一个元素是初始json,因此需要分别计算,但下一个元素(json拼贴元素)可以合并。 – adam0404
好吧,第一个emmited元素是init。 JSON。以下所有是必须应用于init的补丁。 JSON。看起来,您会使用reduce/scan将每个发出的补丁应用到初始json,最后您将获得应用了所有补丁的json。但它可能是,制作人产生的速度比你应用这些补丁更快。为此,您可以使用rxjava的背压。这是对的吗? –