2017-10-17 15 views
0

有人可以帮助我想出一个函数,我可以将其应用于可以防止排放快速发生的可观察项。Rx斯卡拉 - 如何实现排放之间的最短时间

val later = Subject[Int]() 
val combined = Observable.from(List(1,2,3,4,5,6,7,8)) ++ later 
combined.delayMinTime(1 second).doOnEach(...) 

通常情况下,这会发出1-8,然后'随后'任何时间收到onNext。但我希望发出1-8,每秒一次,然后在任何时候接收到一些东西,以及如果接收到任何东西太快,它会延迟每个条目。 所以在Rxmarbles - 它看起来像这样

1 2 3 4    5 6   7 
▼ 
1 2 3 4   5 6  7 

BTW,如果有更好的函数名扔在,以及...

回答

0

我道歉,这是C#,但这是越来越密切:

var query = source.Publish(xs => xs.Window(() => 
{ 
    var n = DateTimeOffset.Now; 
    return xs.Select((x, i) => Observable.Timer(TimeSpan.FromSeconds(System.Math.Max(1.0 + (double)i - DateTimeOffset.Now.Subtract(n).TotalSeconds, 1.0)))).Switch(); 
})).Select(xs => xs.Zip(Observable.Interval(TimeSpan.FromSeconds(1.0)), (x, _) => x)).Merge(); 

它仍然通过一些价值观,让有差距小于1.0秒,但它可能是一个良好的开端,以帮助您制定出扭结。

+0

不是我在找的东西,有趣的,但我不认为这是在我的应用程序中可以正常工作的方法 –