2014-12-11 67 views
0

我以下列方式使用rxjava维护任务的定时器发出的结果:如何使用可观察与flatMap

在一类,我需要定期维护,我用下面的静态订阅,这导致可观测当班级被加载到内存中时,首次启动,然后按照指定的时间间隔进行。

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code 

       return Observable.just(null); 
      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(); 

现在我有一个情况,在这里我要举报我维护用户界面的结果。

通常情况下,我会用下面的架构

Observable.create(new Observable.OnSubscribe<String>() { 

     @Override public void call(Subscriber<? super String> subscriber) { 

      // some code 

      subscriber.onNext(result); 
      subscriber.onCompleted();   } 

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<String>() { 

       @Override public void call(String result) { 

        // write to the UI 

       } 

      }); 

但在这里,我们有刚刚执行一次可观察到的。

对于定期执行的Observable,我找不到在Subscriber中调用Action的方法,以便我可以使用subscriber.onNext()传递结果。看起来Observable没有合适的签名,它可以在timer()中占用很长的时间,同时允许使用action来订阅。但知道rxjava我相信有一个技巧;-)

我可以使用zip压缩一个Timer Observable和一次性Observable(基本上压缩两个版本在一起),但我宁愿使用第一个结构,因为它表现稍有不同。

-

我试着按以下方式两个版本合并成一个:

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code // stays here to ensure there is no concurrency while executing 

       final String result = "result"; // I store the result in a final variable after some code has been finished 

       return Observable.create(new Observable.OnSubscribe<String>() { 
        @Override public void call(Subscriber<? super String> subscriber) { 
         subscriber.onNext(result); // then I use it in a new Observable and emit it 
         subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet) 
        } 
       }); 

      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(new Action1<String>() { 
      @Override public void call(String result) { 
       // so I can finally consume the result on the UI thread 
      } 
     }); 

不是创建和发射“空”可观察的,我创建了一个,让我送导致订户。

很麻烦,但这应该工作,对吧?任何简单的解决方案?你怎么看?

+0

为什么'Observable.timer.flatMap'不起作用? – zsxwing 2014-12-11 09:01:50

+0

我相信这是行不通的,因为flatMap将其发射的Observable变成一个单一的Observable(我想要的),但是,我没有找到一个签名,它给了我一个与订阅者的接口[就像@Override public void call (Subscriber <?super String> subscriber)],所以我可以使用subscriber.onNext()。 – 2014-12-11 15:30:56

回答

1

我不是很想理解你的问题。

你想从你的Observable Timer和你的其他Observable使用相同的订阅吗?

也许你可以使用主体作为你的Observables之间的桥梁。

Subject<?, ?> bridge = PublishSubject.create(); 
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */); 
Observable.create(/* subscription */).subscribe(bridge); 

你“有一次观察的”发出的每个项目将被推入brige是,将其推入“计时器”观测。

这是你在找什么?

+0

[我从未使用过主题/桥梁,所以我需要先阅读并试用,然后才能说出它是否有效。我会在一段时间内回到这个页面。] – 2014-12-11 15:33:12

+0

我已经在上面添加了一个版本,我已经合并了这两个代码片段。没有测试,我相信这应该工作。通过PublishSubject,我认为我们也处于一个很好的方式,也许能够比上面做的更简单地解决问题。但是从你的解释中可以看出,“每一件物品都会被你的”一次可观察“发射出去,将会被推入布里奇,这将推动它进入”计时器“可观测的状态。它看起来像你误解了,因为我不想结合两个Observables。我只打算使用Timer Observable作为时钟,我不需要它的任何值。 – 2014-12-11 17:07:11

+0

好的。也许你想把你的计时器与其他可观察的东西拉在一起。无论如何,我想你明白了这个诀窍。 ;) – dwursteisen 2014-12-11 17:10:36