我是很新,RxJava和有图案等 我使用下面的代码创建一个可观察到的一些问题:RxJava错误处理热点观察到
public Observable<Volume> getVolumeObservable(Epic epic) {
return Observable.create(event -> {
try {
listeners.add(streamingAPI.subscribeForChartCandles(epic.getName(), MINUTE, new HandyTableListenerAdapter() {
@Override
public void onUpdate(int i, String s, UpdateInfo updateInfo) {
if (updateInfo.getNewValue(CONS_END).equals(ONE)) {
event.onNext(new Volume(Integer.parseInt(updateInfo.getNewValue(LAST_TRADED_VOLUME))));
}
}
}));
} catch (Exception e) {
LOG.error("Error from volume observable", e);
}
});
}
一切工作正常,但我有一些关于错误处理的问题。 如果我理解正确,这将被视为“热点观察”,即无论订阅与否,事件都会发生(onUpdate是由我无法控制的远程服务器使用的回调)。
我选择不要在这里调用onError,因为我不希望observable在单个异常情况下停止发射事件。有没有更好的模式可供使用? .retry()出现在脑海中,但我不确定这是否适合热门的可观察性?
另外,在创建订阅时,但在第一个onNext被调用之前,observable如何表示?它只是一个Observable.empty()
你认为错误来自哪里?从'listeners.add()'或者'onUpdate()'?你能给出一个错误情况的例子,你想要通知订阅者。 –
我猜你有点误解热/冷Observable。这并不热,每个用户都有自己的监听器来发送事件。即使你没有注销你的听众处置。由于Observable.create机制,observable在处置后不会发出事件。 –
可能是listeners.add()和onUpdate()。不幸的是,我使用的API是非常不明确的。 – Daniel