2015-11-03 46 views
3

当您有一个大的可观察图形(即使用merge,groupBy,join等组成的可观察组合多次),并抛出异常时,有时很难计算排除异常源自的地方。我想知道是否有可能找出源文件中的可见操作符被唤醒的地方。一个例子应该更清楚。追踪[rx]大图中的可观察异常到源代码

例如,给出以下IllegalStateException: Only one subscriber allowed!和堆栈跟踪,我想知道是否有可能找出行数operatorMergeoperatorFilteroperatorGroupBy等从我的源文件诱发。是否有可能做到这一点不知何故,无论是使用调试器,打印报表或以其他方式?

java.lang.IllegalStateException: Only one subscriber allowed! 
     at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS 
ubscriber.java:124) 
     at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS 
ubscriber.java:81) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy. 
java:251) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy. 
java:236) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer 
ge.java:215) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
85) 
     at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1 
20) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80) 

     at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63) 
     at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab 
leList.java:93) 
     at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44) 
     at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42) 
     at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat 
orTakeUntilPredicate.java:54) 
     at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84) 
     at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou 
pBy.java:286) 
     at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1 
81) 
     at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB 
y.java:340) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy. 
java:226) 
     at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124 
) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis 
h.java:560) 
     at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish. 
java:258) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr 
ibeFromIterable.java:98) 
     at rx.Subscriber.setProducer(Subscriber.java:177) 
     at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java 
:50) 
     at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java 
:33) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer 
ge.java:215) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
85) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
20) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124 
) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis 
h.java:560) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish. 
java:258) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou 
pBy.java:286) 
     at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1 
81) 
     at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB 
y.java:340) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy. 
java:226) 
     at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198) 
... 

这个问题的出现本质上是因为Observable的整个观点是在执行时将a)代码从b)中分离出来。但对于调试程序来说,这是一场噩梦。因此,重复上面的问题,我想知道是否有可能在源代码中跟踪每个组合的原始行。

回答

1

有关于额外的调试信息,但整个库运行100X慢一些实验和被放弃了。

问题可能出现在您的groupBy中的flatMap中,您在此订阅GroupedObservable并将其交还给现在无法订阅它的flatMap:GroupedObservable只能使用一次。您需要使用publish()replay()运营商之一,并相应调整功能逻辑。

+0

感谢您的意见;它确实通过“共享”组和可嵌套观察量来修复。 – Luciano

0

一年之后我还在为此努力奋斗,并且仍然没有发现追踪执行的好方法。我发现我依靠在代码中放置打印语句来看看发生了什么。这是我能够掌握正在发生的事情的唯一途径。

我发现有用的唯一的事情是创造这样的模式,所以我没有写doOnNext(x => println(x))显示发生了什么事情,每一次:

implicit class ObservableTrace[T](o : rx.lang.scala.Observable[T]) { 
    import java.time.LocalTime 
    def trace(name : String) : rx.lang.scala.Observable[T] = { 
     def print(s: String) = println(s"${LocalTime.now} : $name : $s") 
     (o doOnNext (x => print("next:" + x)) 
     doOnSubscribe print("subscribed") 
     doOnCompleted print("completed") 
     doOnError (e => print("error: " + e)) 
     doOnUnsubscribe print("unsubscribed") 
     ) 
    } 

这使得编辑代码快速 - 只写一对夫妇的观察到的myobservable.trace("My Observable"),这可以很容易地看到,当不同的生命周期事件发生。

相关问题