我有以下代码,它使用PublishSubject
。与RxJava2意外的行为PublishSubject
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
1)I从它创建一个Observable
并将其映射(对于此示例的目的,我只是转换为String
)=>o1
。
2)然后我订阅o1
3次。
3)最后,我通过调用subject.onNext(1)
“发布”一个事件。
出乎我的意料,我得到以下的输出:
Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1
map
最终被称为3次,我不明白为什么,因为我订阅o1
发生map
之后应该发生。我肯定错过了什么。任何帮助,将不胜感激。
感谢 燕
您订阅了'o1'三次,每次创建一个独立的序列,直到'PublishSubject'将发布onNext'到所有3个链。 – akarnokd
你说“直到'PublishSubject'”:为什么它一直到主题?你能否指出我会在哪里解释?如果这是正常的行为有没有一种方法来转换地图后的流,以便它不这样做? – yan
因为从所有3个订阅者的角度来看,PublishSubject是一个多播源,它通过subscribe()调用建立的独立链向他们发送事件。 – akarnokd