2017-06-19 40 views
0

我有以下代码,它使用PublishSubject与RxJava2意外的行为PublishSubject

val subject = PublishSubject.create<Int>() 

val o1: Observable<String> = 
     subject.observeOn(Schedulers.newThread()).map { i: Int -> 
      println("${Thread.currentThread()} | ${Date()} | map => $i") 
      i.toString() 
     } 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it") 
} 

println("${Thread.currentThread()} | ${Date()} | submitting 1") 

subject.onNext(1) 

1)I从它创建一个Observable并将其映射(对于此示例的目的,我只是转换为String)=>o1

2)然后我订阅o1 3次。

3)最后,我通过调用subject.onNext(1)“发布”一个事件。

出乎我的意料,我得到以下的输出:

Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1 

map最终被称为3次,我不明白为什么,因为我订阅o1发生map之后应该发生。我肯定错过了什么。任何帮助,将不胜感激。

感谢 燕

+0

您订阅了'o1'三次,每次创建一个独立的序列,直到'PublishSubject'将发布onNext'到所有3个链。 – akarnokd

+0

你说“直到'PublishSubject'”:为什么它一直到主题?你能否指出我会在哪里解释?如果这是正常的行为有没有一种方法来转换地图后的流,以便它不这样做? – yan

+0

因为从所有3个订阅者的角度来看,PublishSubject是一个多播源,它通过subscribe()调用建立的独立链向他们发送事件。 – akarnokd

回答

1

从评论:

您订阅o1三次,每次创建一个独立的序列,直到PublishSubject将派遣onNext所有3支链。

从所有3个用户的角度来看,PublishSubject是一个组播源,它通过由subscribe()调用建立的独立链向他们发送事件。

Subject上应用运算符通常不会使整个链变热,因为这些运算符元素仅在订阅时才附加到源Subject。因此,多个订阅将产生多个通道到相同的上游Subject

使用publish得到一个ConnectableObservable(或另一个PublishSubject在最后),使序列从那时起变得很热。

+0

我确认在map之后加上'.publish.autoConnect()'就可以了。我不是100%确定你是指另一个'PublishSubject''的意思,但是我的工作方式是添加'.subscribe {otherSubject.onNext(it)}',并在其他主题上进行订阅。 – yan

+0

要关闭这个循环,我创建了一个要点来演示2个选项https://gist.github.com/ypujante/2ab3c3a135272ea4bc4554cbfc287ca7。最后我选择了第二个主题,因为它对我来说似乎更清洁。 – yan