1
我想能够模拟合并两个单独的流,它们发出一些对象(即扩展相同的父对象),使用缓冲区操作符缓冲它们并在10秒后发出收集的数据。我希望这种机制是无限的,这种合并/缓冲区总是在来自2个分离的流的排放时被调用。如何模拟发射2 infiite Observable流,并有其他Observable每10秒合并它们并缓冲?
以下是我迄今所做的:
val list1 = mutableListOf<SomeClass1>(
SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3),
SomeClass1("4", 4), SomeClass1("5", 5), SomeClass1("6", 6),
SomeClass1("7", 7), SomeClass1("8", 8), SomeClass1("9", 9)
)
val list2 = mutableListOf<SomeClass2>(
SomeClass2(1.00), SomeClass2(2.00), SomeClass2(3.00),
SomeClass2(4.00), SomeClass2(5.00), SomeClass2(6.00),
SomeClass2(7.00), SomeClass2(8.00), SomeClass2(9.00)
)
val someClass1Observable = Observable
.fromIterable(list1)
.zipWith(Observable.interval(2, TimeUnit.SECONDS),
BiFunction { item: SomeClass1, _: Long -> item })
val someClass2Observable = Observable
.fromIterable(list2)
.zipWith(Observable.interval(2, TimeUnit.SECONDS),
BiFunction { item: SomeClass2, _: Long -> item })
someClass1Observable.subscribe {
Log.v("someClass1", it.toString())
}
someClass2Observable.subscribe {
Log.v("someClass2", it.toString())
}
Observable.merge(someClass1Observable, someClass2Observable)
.buffer(10, TimeUnit.SECONDS)
.repeat()
.doOnSubscribe { Log.v("parentObservable", "STARTED") }
.subscribe { t: MutableList<Parent> ->
Log.v("parentObservable", "onNext")
t.forEach { Log.v("onNext", it.toString()) }
}
Thread.sleep(30000)
Log.v("AFTER_SLEEP", "AFTER_SLEEP")
someClass1Observable.subscribe {
Log.v("someClass1", it.toString())
}
someClass2Observable.subscribe {
Log.v("someClass2", it.toString())
}
2流的第一发射工作正常,合并/缓冲器可观察到从他们每次10秒后收集的排放。然而,当这两个流结束排放,我再次订阅他们,缓冲/合并Observable不是爱好者的工作。如何让这项工作像无限一样?有没有更好的方法来编写代码发送对象的2个单独的流,他们将不需要读取列表中的值,而不是他们会每隔2秒发出一个新的对象间隔? 如何使合并/缓冲区Observable的工作方式是无限的,我的意思是每当有2个Observable流发生新的辐射时?