2017-04-11 83 views
1

我是很新,RxJava和有图案等 我使用下面的代码创建一个可观察到的一些问题:RxJava错误处理热点观察到

public Observable<Volume> getVolumeObservable(Epic epic) { 
     return Observable.create(event -> { 
      try { 
       listeners.add(streamingAPI.subscribeForChartCandles(epic.getName(), MINUTE, new HandyTableListenerAdapter() { 
        @Override 
        public void onUpdate(int i, String s, UpdateInfo updateInfo) { 
         if (updateInfo.getNewValue(CONS_END).equals(ONE)) { 
          event.onNext(new Volume(Integer.parseInt(updateInfo.getNewValue(LAST_TRADED_VOLUME)))); 
         } 
        } 
       })); 
      } catch (Exception e) { 
       LOG.error("Error from volume observable", e); 
      } 
     }); 
    } 

一切工作正常,但我有一些关于错误处理的问题。 如果我理解正确,这将被视为“热点观察”,即无论订阅与否,事件都会发生(onUpdate是由我无法控制的远程服务器使用的回调)。

我选择不要在这里调用onError,因为我不希望observable在单个异常情况下停止发射事件。有没有更好的模式可供使用? .retry()出现在脑海中,但我不确定这是否适合热门的可观察性?

另外,在创建订阅时,但在第一个onNext被调用之前,observable如何表示?它只是一个Observable.empty()

+0

你认为错误来自哪里?从'listeners.add()'或者'onUpdate()'?你能给出一个错误情况的例子,你想要通知订阅者。 –

+1

我猜你有点误解热/冷Observable。这并不热,每个用户都有自己的监听器来发送事件。即使你没有注销你的听众处置。由于Observable.create机制,observable在处置后不会发出事件。 –

+0

可能是listeners.add()和onUpdate()。不幸的是,我使用的API是非常不明确的。 – Daniel

回答

2

1)你的可观察性不是。区分因素是多个订户是否共享相同的订阅。 Observable.create()为每个用户调用订阅功能,即它是

虽然很容易使它。只需添加share()运营商。它将订阅第一位订阅者并取消订阅最后一位订阅者。不要忘记实现退订功能,像这样的东西:

event.setCancellable(() -> listeners.remove(...)); 

2)错误可能是可恢复无法恢复

如果你认为一个错误是可以自我修复的(不需要你的行为),你不应该打电话onError,因为这会杀死你的可观察的(不会发生进一步的事件)。您可能会通过发出特殊的Volume消息并附上错误详细信息来通知您的订户。

如果错误是致命的,例如你没有添加监听器,所以可能没有更多的消息,你不应该默默地忽略这个。发射onError因为你的可观察性无论如何都不起作用。

如果错误需要您采取措施,通常是重试或超时重试,您可以添加retryXxx()运算符中的一个。在create()之后执行此操作,但在之前执行share()

3)Observable是具有subscribe()方法的对象。它是如何表示取决于你创建它的方法。例如,请参阅源代码create()