2017-10-18 39 views
2

我正在实现一个可观察的来自Resource的线。从正确的线程调用RxJava2可以取消/抛弃

问题是,这个资源真的不喜欢从它创建的不同线程中关闭(它会杀死一只小狗并在发生这种情况时抛出一个异常)。

当我处理认购,资源Cancellable/Disposablemain线程中调用,而观察到的是订阅的Schedulers.io()

这里是科特林代码:

fun lines(): Observable<String> = 
     Observable.create { emitter -> 
      val resource = NetworkResource() 
      emitter.setCancellable { 
       resource.close() // <-- main thread :(
      } 
      try { 
       while (!emitter.isDisposed) 
        emitter.onNext(resource.readLine()) // <-- blocked here! 
      } catch (ioe: IOException) { 
       emitter.tryOnError(ioe) // <-- this also triggers the cancellable 
      } 
     } 

val disposable = lines() 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe { Log.i(TAG, "Line: $it" } 

disposable.dispose() // <-- main thread :) 

问题:是否可以调用从正确的*线程Cancellable,考虑到订阅线程被阻塞在resource.readLine()

*正确的线程意味着从subscribeOn(Schedures.io())

编辑:恐怕这个问题并没有一个正确的答案,除非resource.close()是线程安全的或resource.dataReady某种投票的实施,使得线程没有被阻塞。

+0

我想你应该尝试'unsubscribeOn'方法来定义'cancellable'的执行位置。 – masp

+0

@masp谢谢你的评论,但它不起作用。请参阅我对以下答案的评论。任何其他想法? – ESala

回答

2

Schedulers.io()管理线程池,所以它可能会或可能不会使用同一个线程来处理您的资源。您将不得不使用自定义调度程序和unsubscribeOn()运算符,以确保您的Observable已订阅并取消订阅到同一个线程中。喜欢的东西:

Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 

val disposable = lines() 
     .unsubscribeOn(customScheduler) 
     .subscribeOn(customScheduler) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe { Log.i(TAG, "Line: $it" } 
+0

谢谢你的回答,但它不起作用。来自执行程序的线程在'resource.readLine()'处被阻塞,因此使用'unsubscribeOn(...)'调度可取消的程序永远不会调用它,因为线程永远不会被释放。想法? – ESala

+0

可以从另一个线程中调用'readLine()'吗? –

1

如果你不介意的通话延迟到NetworkResource#close一点点,为什么不

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       val resource = NetworkResource() 
       try { 
        while (!emitter.isDisposed) { 
         emitter.onNext(resource.readLine()) 
        } 
        resource.close() 
       } catch (ioe: IOException) { 
        emitter.tryOnError(ioe) 
       } 
      } 

但仍然有一个问题,这一点:在IOException没有人的情况下,将永远呼叫NetworkResource#close(在我看来你的例子中也是这样)。

尝试解决这个问题:

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       val resource = NetworkResource() 
       try { 
        while (!emitter.isDisposed) { 
         emitter.onNext(resource.readLine()) 
        } 
       } catch (ioe: IOException) { 
        emitter.tryOnError(ioe) 
       } finally { 
        resource.close() // try-catch here, too? 
       } 
      } 

,或者使用 “科特林 - 尝试与 - 资源” 功能use

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       NetworkResource().use { resource -> 
        try { 
         while (!emitter.isDisposed) { 
          emitter.onNext(resource.readLine()) 
         } 
        } catch (ioe: IOException) { 
         emitter.tryOnError(ioe) 
        } 
       } 
      } 

我希望这有助于。希望你过个愉快的周末。

+0

嘿Peti,谢谢你的回答,但是它并没有解决问题,因为线程仍然会停留在'resource.readLine()',所以'emitter.isDisposed'标志将不会被检查,直到一行收到了,这可能是一段时间。除非从另一个线程调用'resource.close()'是安全的,否则恐怕没有答案。周末愉快! – ESala

+0

是的,专用的IO线程会调用'close'延迟...这是“如果你不介意延迟呼叫”这个答案的介绍:-)但这真的是一个问题吗?因为'NetworkResource'似乎被阻塞,所以专用的IO线程无论如何都会在正在进行的'lines()'订阅的生命周期中被浪费掉...... – Peti

+0

你说得对,被“浪费”的线程是一个小问题,我更担心的是无法关闭底层网络连接。 – ESala

0

如何选择替代路径?

一)使NetworkResource线程安全(如果你是通过与 “代理”包装NetworkResource的源代码的控制)

B)?对于“代理”,我的意思是一个内部使用专用线程的代理,它与NetworkResource(构建,readLine,关闭,......)

+0

再次感谢Peti。关于a)我无法控制它,关于b)已经尝试过它。 – ESala

相关问题