2017-06-30 39 views
0

我有一个代码,我正在做一个间隔,直到一个条件成立,然后在订阅中发回结果。发射后取消订阅间隔

但是由于是一个间隔订阅继续。

我在想,如果there's任何退订方式可观察到的间隔一次emmit东西

这里的代码

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS) 
       .map(i -> eventHandler.getProcessedEvents()) 
       .filter(eventsProcessed -> eventsProcessed >= 10) 
       .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents()) 
       .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed)) 
       .subscribe(t -> resumeRequest(asyncResponse)); 
     new TestSubscriber((Observer) subscriber).awaitTerminalEvent(10, TimeUnit.SECONDS); 
subscriber.unsubscribe(); 

现在作为一个黑客我使用一个计时器,然后取消,但它糟糕!

问候

+0

你的状况如何? – JohnWowUs

+0

时间间隔是要求事件的数量。如果我们通过过滤器,只能去onNext – paul

回答

1

可以使用first操作

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS) 
       .map(i -> eventHandler.getProcessedEvents()) 
       .first(eventsProcessed -> eventsProcessed >= 10) 
       .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents()) 
       .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed)) 
       .subscribe(t -> resumeRequest(asyncResponse)); 

,而不是过滤器。这可确保您只有在符合条件的情况下才能获得一次排放。请注意,如果条件时间间隔Observable在没有满足条件的情况下终止,您将收到异常。

+0

谢谢你是一个很好的解决方案! – paul