2016-11-01 78 views
0

我有以下代码来修改我的源observable,以便它在应用程序状态在ENABLED和DISABLED之间切换时断开连接并重新连接。但是,如果我的sourceObservable本身有一个调用完成的情况,我怀疑它会立即重新连接,因为这个变换器的重复。如果应用程序状态仍然是ENABLED。repeatWhen基于触发完成的事件

是否有一种解决这个问题的优雅方式,或许将takeUntil分组并重复,以便它们不会捕获来自上面的任何完整事件?

N.B. applicationStatusObservable根据请求重复其最后一个值。

帮助是非常赞赏:)

/** 
* Transforms the source observable so that it defers initial subscription until the service becomes available, 
* unsubscribes when the service becomes unavailable, and resubscribes when the service becomes available again. 
*/ 
public class AvailabilityTransformer<T> implements Observable.Transformer<T, T> { 

    private final Observable<ApplicationStatus> applicationStatusObservable; 

    AvailabilityTransformer(final Observable<ApplicationStatus> applicationStatusObservable) { 
     this.applicationStatusObservable = applicationStatusObservable; 
    } 

    @Override 
    public Observable<T> call(final Observable<T> sourceObservable) { 
     final Observable<ApplicationStatus> applicationEnabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus == ENABLED); 
     final Observable<ApplicationStatus> applicationDisabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus != ENABLED); 
     return sourceObservable 
       .takeUntil(applicationDisabledObservable) // Unsubscribe whenever the application is disabled 
       .repeatWhen(repeatObservable -> repeatObservable.flatMap(repeat -> 
         applicationEnabledObservable.flatMap(applicationStatus -> just(repeat)))) // Resubscribe when enabled again 
       .delaySubscription(applicationEnabledObservable.first()); // Delay the initial subscription until the application is first enabled 
    } 
} 
+0

applicationStatusObservable是热还是冷?那么,你想用AvailabilityTransformer实现什么?我不明白。 –

+0

applicationStatusObservable对于重播(1).refCount()是冷的 - 在任何情况下,它总是会返回当前的应用程序状态,并且如果观察者保持订阅状态,则会通知应用程序状态的任何更改。 这个想法是,如果应用程序被禁用,我们取消源可观察订阅。当它再次启用时,它将重新订阅源观察值。 –

+0

纵观OperatorTakeUntil和OnSubscribeDelay等的实现我认为它可能更适合将它写成运营商而不是Transformer ..现在关闭。 –

回答

0

他,

我花了一些时间来了解你在做什么。我希望我终于明白了。您有一个热点可观察的isApplicationAvailable,它会告知订阅者该应用程序是启用还是禁用(true/false)。你还有其他的可观察者,他们正在做一些工作(可能是热/冷),并希望他们只产生值,如果observable isApplicationAvailable返回true。如果对于某些应用程序而言,应用程序变为禁用状态,则可观察对象不应生成任何值。

我使用RxJava2进行测试。

在这个例子中,如果observable isApplicationActive返回true,那么stringObservable只会将值传递给订阅者。

@Test 
public void name() throws Exception { 
    Subject<Boolean> isApplicationActive = BehaviorSubject.<Boolean>create() 
      .toSerialized(); 

    Observable<Boolean> isApplicationActiveObservable = isApplicationActive 
      .hide() 
      .doOnNext(s -> System.out.println("isApplicationActive: " + s)); 

    isApplicationActive.onNext(false); 

    Thread.sleep(1_000); 

    Observable<String> stringObservable = Observable.interval(1_000, TimeUnit.MILLISECONDS) 
      .map(Objects::toString) 
      .doOnNext(s -> System.out.println("NextIntervalValue")) 
      .compose(createSwitchMapCompose(isApplicationActiveObservable)); 

    stringObservable.subscribe(s -> { 
     System.out.println("stringObservable: " + s); 
    }); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(6_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(20_000); 
} 

private ObservableTransformer<String, String> createSwitchMapCompose(Observable<Boolean> isApplicationActiveObservable) { 
    return upstream -> upstream.switchMap(s -> isApplicationActiveObservable.take(1).flatMap(aBoolean -> { 
       if (aBoolean) { 
        return Observable.just(s); 
       } 
       return Observable.empty(); 
      }) 
    ); 
} 
相关问题