2016-11-04 138 views
1

我们有搜索字段的窗口。每次用户输入搜索内容时都会执行。RxJava一次只能执行一个Observable

  1. 将搜索事件转换为数据流。

  2. 在每次新的搜索中,我们需要启动异步网络操作并关闭上一个。如何存档这个效果?

编辑1:这是我试过的。它执行所有可观察的事物!不是唯一的最后一个,哪里出错?

PublishSubject<Integer> subject = PublishSubject.create(); 

subject.switchMap(integer -> Observable.fromCallable(() -> { 
    try { 
     Thread.sleep(500); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    return "-" + integer + "-"; 
})).subscribe(s -> System.out.print(s)); 
for (int i = 0; i < 30; i++) subject.onNext(i); 
+0

我自己还是个初学者,但是难道你不是在观察用户搜索输入,而是反弹,然后用去抖用户输入执行搜索? – Andreas

+0

@Andreas是的,这是我需要的一半。此外,一次只能执行一次执行等等。每个新值都取消以前的 – dooplaye

+0

您可能希望同时执行'debounce'和'switchMap',以避免每次按键后搜索以及取消旧搜索。 – akarnokd

回答

3

使用PublishSubjectswitchMap

PublishSubject<String> subject = PublishSubject.create(); 
subject 
    .switchMap(
    x -> networkOperationObservable(x) 
      .subscribeOn(Schedulers.io)) 
    .subscribe(subscriber); 

为了充分利用此networkOperationObservable将明智地回应unsubscribe呼叫(如关闭Socket或其他)。一般而言,Observable.using是首选工具。

+0

请检查我的编辑。 – dooplaye

0

PublishSubject正是你所需要的。这是一个Observable您可以随时添加事件。

创建它:

private Subject<String, String> filteringSubject = PublishSubject.create(); 

然后使用Observable.throttleWithTimeout()操作:

Subscriber<String> filteringSubscriber = new DefaultSubscriber<>(); 
filteringSubject 
    .throttleWithTimeout(250, TimeUnit.MILLISECONDS) 
    .doOnNext(new Action1<String>() { 
     @Override 
     public void call(String text) { 
      //make a network call 
     } 
    }) 
    .subscribe(filteringSubscriber); 

用法:

filteringSubject.onNext(text); 
+0

它不是。网络调用必须返回一些值。 – dooplaye

+0

好吧,然后'FlatMap''String'网络调用和订阅整个'Obsevable'真正的'Subscriber',而不是假的。 –