我正在实现一个可观察的来自Resource
的线。从正确的线程调用RxJava2可以取消/抛弃
问题是,这个资源真的不喜欢从它创建的不同线程中关闭(它会杀死一只小狗并在发生这种情况时抛出一个异常)。
当我处理认购,资源Cancellable
/Disposable
从main
线程中调用,而观察到的是订阅的Schedulers.io()
。
这里是科特林代码:
fun lines(): Observable<String> =
Observable.create { emitter ->
val resource = NetworkResource()
emitter.setCancellable {
resource.close() // <-- main thread :(
}
try {
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
} catch (ioe: IOException) {
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
}
}
val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
disposable.dispose() // <-- main thread :)
问题:是否可以调用从正确的*线程Cancellable
,考虑到订阅线程被阻塞在resource.readLine()
?
*正确的线程意味着从subscribeOn(Schedures.io())
。
编辑:恐怕这个问题并没有一个正确的答案,除非resource.close()
是线程安全的或resource.dataReady
某种投票的实施,使得线程没有被阻塞。
我想你应该尝试'unsubscribeOn'方法来定义'cancellable'的执行位置。 – masp
@masp谢谢你的评论,但它不起作用。请参阅我对以下答案的评论。任何其他想法? – ESala