我有2个流,第一个流是从数据库获取数据并在完成数据后调用onCompleted()
的流。第二个流是从服务器获取实时数据的流,并且不会调用onCompleted()
。我想要做的是创建一个操作符,如果第一个流(上游)是空流,就可以执行操作。下面是示例:在RxJava中创建doIfEmpty运算符
getItemFromDatabase()
.lift(new DoIfEmptyOperator<Item>(new Action0() {
@Override
public void call() {
//Database is empty
System.out.println("Yeay successfully do an action");
}
}))
.concatWith(getItemFromServer()) // -----> intentionally never complete
.subscribe(new Subscriber<StoryItem>() {
@Override
public void onCompleted() {
//dosomething...
}
@Override
public void onError(Throwable e) {
//dosomething...
}
@Override
public void onNext(StoryItem storyItem) {
//dosomething
}
}));
下面是DoIfEmptyOperator的代码:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
public class DoIfEmptyOperator<T> implements Observable.Operator<T,T>{
private Action0 action;
private boolean isEmpty = true;
public DoIfEmptyOperator(Action0 action) {
this.action = action;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> childSubscriber) {
Subscriber<T> parentSubscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
if(isEmpty) {
action.call();
}
childSubscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
childSubscriber.onError(e);
}
@Override
public void onNext(T t) {
isEmpty = false;
childSubscriber.onNext(t);
}
};
childSubscriber.add(parentSubscriber);
return parentSubscriber;
}
}
但是因为parentSubscriber onCompleted()
在不触发永远不会执行的动作,由于下游没有完成。如果我删除
.concatWith(getItemFromServer())
,则执行该操作。有关如何解决问题的任何线索?我已经潜入Observable.switchIfEmpty()的源代码,但仍然不知道它是如何工作的。
啊从来没有想过这个,它的工作原理,谢谢:) – SalacceoVanz