2017-02-09 23 views
2

我试图实现某种RxBus,它允许发布特定类型的事件并根据对象类来侦听它们。我使用BehaviorSubject来支持粘性事件(甚至可以在订阅之前发布)。每个类型都应该保留粘性事件。这里是代码:RxJava:特定类型的BehaviorSubject

private final Subject<BaseEvent, BaseEvent> bus = new SerializedSubject<>(BehaviorSubject.create()); 

public <E extends BaseEvent> void post(E event) { 
    bus.onNext(event); 
} 

public <E extends BaseEvent> Observable<E> observe(Class<E> eventClass) { 
    return bus.asObservable().ofType(eventClass); 
} 

它适用于一种类型的事件。但是,如果有更多不同的事件,并且最后发布的事件与我订阅的事件类型不同,则它会被ofType()过滤,因为BehaviorSubject只保留最后一个不依赖于类型的事件。

我想两种解决方案:

  1. 要创建地图对象为每种类型的,但也有亚型有问题。
  2. 使用ReplaySubject并使用ofType()和distinct()过滤事件。但是我找不到一种方法来区分订阅前后发出的事件。

您认为,有没有办法让这些解决方案工作,或者我错过了一些东西,并有更好的实施方法?

回答

1

首先,你不想要一个主题,而是一个Relay

我向你提出第三种解决方案:编写你自己的Relay(或Subject)实现,它将记住所有不同的事件。你不需要重写一个中继或主题从头开始,你可以依靠现有的实现(在这里,PublishRelay):

import com.jakewharton.rxrelay2.PublishRelay; 
import com.jakewharton.rxrelay2.Relay; 

import java.util.HashMap; 
import java.util.Map; 

import io.reactivex.Observer; 

public class RxBus extends Relay<Object> { 

    private PublishRelay<Object> concreteRelay = PublishRelay.create(); 
    private Map<Class, Object> stickyEvents = new HashMap<>(); 

    public <T> T getSticky(Class<T> type) { 
     return (T)stickyEvents.get(type); 
    } 

    @Override 
    public void accept(Object value) { 
     stickyEvents.put(value.getClass(), value); 
     concreteRelay.accept(value); 
    } 

    @Override 
    public boolean hasObservers() { 
     return concreteRelay.hasObservers(); 
    } 

    @Override 
    protected void subscribeActual(Observer<? super Object> observer) { 
     concreteRelay.subscribeActual(observer); 
    } 
} 
+0

谢谢,保持粘滞事件与主要主题分开的想法没有发生对我来说。 :) –