2017-07-28 72 views
1

我有一个缓冲的流,等待的静默时间预定量,发布已经缓存元素的列表之前:如何动态缩放爆发排放流的去抖动?

INTEGERS 
    .share() 
    .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) 
    .map { durations -> 
     ... 
    } 

我想打DEBOUNCE_TIME动态取决于平均调整的缓冲项目,但我很难找出如何实现这一目标。

回答

1

你可以推迟去抖,一旦新的防抖动值已经确定利用它和触发重复一个项目:

int DEBOUNCE_TIME = 100; 
AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); 
PublishSubject<Integer> mayRepeat = PublishSubject.create(); 

AtomicInteger counter = new AtomicInteger(); 

Observable<Integer> INTEGERS = 
     Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) 
     .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) 
       .map(w -> counter.incrementAndGet())); 

INTEGERS.publish(o -> 
     o.buffer(
      Observable.defer(() -> 
       o.debounce(
        debounceTime.get(), TimeUnit.MILLISECONDS) 
      ) 
      .take(1) 
      .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) 
     ) 
    ) 
    .map(list -> { 
     int nextDebounce = Math.min(100, list.size() * 100); 
     debounceTime.set(nextDebounce); 
     mayRepeat.onNext(1); 
     return list; 
    }) 
    .blockingSubscribe(System.out::println); 

此打印:

[1, 2] 
[3, 4, 5] 
[6, 7, 8, 9] 
[10] 
+0

在我的项目实施之后,我不得不说这非常酷,而且实际上效果很好。一旦我开始编写一些测试,它帮助我确定,由于数据集的限制,我所要完成的核心并不是真正可行的。我实际上有点难过,我不再有用例来保证包括这个整洁的小模式。感谢您花时间回复! :) – saganaut