3

一点点上下文: 在这个链接:https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh提出了一个手动递归实现来使用Observable进行轮询。 但是在最新的RxJava版本中没有OnSubscribeFunc使用Observable进行轮询的手动递归的更新版本是什么?

这是我目前的执行情况:

Observable.create(new Observable.OnSubscribe<Item>() { 
     @Override 
     public void call(final Subscriber<? super Item> innerSubscriber) { 

      Schedulers.io().createWorker() 
         .schedulePeriodically(new Action0() { 
          @Override 
          public void call() { 
           searchObservable() 
             .doOnNext(new Action1<Item>() { 
              @Override 
              public void call(Item item) { 
               innerSubscriber.onNext(item); 
              } 
             }) 
             .doOnError(new Action1<Throwable>() { 
              @Override 
              public void call(Throwable throwable) { 
               if (throwable != null) { 
                innerSubscriber.onError(throwable); 
               } 
              } 
             }) 
             .doOnCompleted(new Action0() { 
              @Override 
              public void call() { 
               innerSubscriber.onCompleted(); 
              } 
             }).subscribe(); // Set subscriber? 
          } 
         }, initialDelay, pollingInterval, TimeUnit.MINUTES); 
     } 
    }) 
      .subscribeOn(Schedulers.io()) // performs networking on background thread 
      .observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread 
      .subscribe(subscriber); // The subscriber 

searchObservable执行服务请求。这是第一次运行良好,这是数据传递到subscriber。但是,在等待pollingInterval后,数据将返回并执行doOnNext,但数据不会传递到UI。我是否需要在中设置schedulePeriodically需要的用户?

回答

0

这是什么工作对我罚款,并遵循什么手册递归的是范式:

public void manualRecursionPollingStrategy(Subscriber<Item> subscriber, Scheduler observeOnScheduler, long initialDelay, long pollingInterval) { 
      Observable.create(new Observable.OnSubscribe<Item>() { 
       @Override 
       public void call(final Subscriber<? super Item> innerSubscriber) { 
        Schedulers.newThread().createWorker() 
           .schedulePeriodically(new Action0() { 
            @Override 
            public void call() { 
               searchByHashtagObservable() 
                 .subscribeOn(Schedulers.io()) // performs networking on background thread 
                 .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the 
                   // UI thread 
                 .subscribe(
                   new Action1<Item>() { 
                    @Override 
                    public void call(Item item) { 
                     subscriber.onNext(item); 
                    } 
                   }, 
                   new Action1<Throwable>() { 
                    @Override 
                    public void call(Throwable throwable) { 
                     if (throwable != null) { 
                      subscriber.onError(throwable); 
                     } 
                    } 
                   }, 
                   new Action0() { 
                    @Override 
                    public void call() { 
                     subscriber.onCompleted(); 
                    } 
                   } 
                ) 
            } 
           }, initialDelay, pollingInterval, TimeUnit.MINUTES); 
       } 
      }) 
       .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the UI thread 
       .subscribe(subscriber); 

请注意,我订阅searchByHashtagObservable()并调用onNextonError和作为参数传递的Subscriber的。

谢谢!

0

它停止,因为您打电话innerSubscriber.onCompleted它终止第一次运行序列。有标准运营商,可以让你同样的效果,而无需创建可观测自定义:

Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io()) 
.onBackpressureBuffer() 
.concatMap(v -> searchObservable()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(subscriber); 

(注:无需subscribeOn()在这里,因为间隔无论如何都会发出对Schedulers.io())。

相关问题