一点点上下文: 在这个链接:https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh提出了一个手动递归实现来使用Observable进行轮询。 但是在最新的RxJava版本中没有OnSubscribeFunc
。使用Observable进行轮询的手动递归的更新版本是什么?
这是我目前的执行情况:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
执行服务请求。这是第一次运行良好,这是数据传递到subscriber
。但是,在等待pollingInterval
后,数据将返回并执行doOnNext
,但数据不会传递到UI。我是否需要在中设置schedulePeriodically
需要的用户?