2016-08-26 51 views
2

可观察比方说,我们有源可观察到一个int:如何产生一个从另一个

val source:Observable[Int] 

我想创造另一个观察的,生产值,它们的区别在来首次出现值大于10 :

def detect() = Observable[Int](
    subscriber => 
    if (!subscriber.isUnsubscribed) { 
     var start:Option[Int] = None 
     source.subscribe(
     item => { 
      if (start.isEmpty) { 
      start = Option(item) 
      } 
      else { 
      start.filter(v => Math.abs(item - v) > 10).foreach { 
       item => subscriber.onNext(item) 
      } 
      } 
     } 
    ) 
     subscriber.onCompleted() 
    } 
) 

在这里,我用VAR 开始持有可观察到的第一个值。

有没有简化此代码的方法?我不喜欢的VAR值分配给这种方法

回答

3

这里是我想出了:

import rx.lang.scala.Observable 

val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50)) 

source.scan(Option.empty[(Int, Int)]) { (acc, next) => 
    acc.map(_.copy(_2 = next)) orElse Some((next, next)) 
}.collect { 
    case Some((start, current)) if math.abs(start - current) > 10 => current 
}.subscribe(x => println(x)) 

打印

16 
-40 
-70 
50 

基本扫描保持一个可以未初始化的累加器(None),或者可以保存一对:从源发出的第一个值和最后一个元素。然后我们只收集那些符合你谓词的元素。

0

你只需要应用filter运营商,其产生的新的可观察,反映观察到的源的排放,但跳过那些为其谓词测试错误:

val filtered = source.filter(v => Math.abs(item - v) > 10) 
+0

我该如何保持源的第一个值?我将来源的每个下一个值与第一个值比较 – Nyavro

+0

啊,我误解了你的问题;另一个答案似乎是正确的。 –