2016-04-21 37 views
4

我听了这个演讲 https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274RxJava可观察的替代创建异步调用

而且耳,我应该避免使用create方法,因为它不会自动处理退订和反压形成可观察到的,但我可以在下面的代码中找不到替代品。

compositeSubscription.add(
    Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() { 
     @Override 
     public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) { 

      modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() { 
       @Override 
       public void onResult(DTOCompaniesCallback dtoCompaniesCallback) { 
        try { 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onNext(dtoCompaniesCallback); 
          subscriber.onCompleted(); 
         } 
        } catch (Exception e) { 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onError(e); 
         } 
        } 
       } 
      }); 

     } 
    }) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Action1<DTOCompaniesCallback>() { 
     @Override 
     public void call(DTOCompaniesCallback dtoCompaniesCallback) { 
      Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size()); 
     } 
    }, new Action1<Throwable>() { 
     @Override 
     public void call(Throwable throwable) { 
      throw new OnErrorNotImplementedException("Source!", throwable); 
     } 
    }) 
); 

我呼吁清除CompositeSubscription中的OnDestroy方法

@Override 
public void onDestroy() { 
    if (compositeSubscription != null) { 
     compositeSubscription.clear(); 
    } 
} 

你看到任何替代品,我可以在这里使用的创建方法? 您是否看到任何潜在的危险或者这种方法安全? 感谢

回答

5

您可以使用延迟+ AsyncSubject:

Observable.defer(() -> { 
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create(); 
    modelTrainStrike.getCompaniesFromServer(v -> { 
     async.onNext(v); 
     async.onComplete(); 
    }); 
    return async; 
}) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
... 

万一getCompaniesFromServer支持取消,您可以:

Observable.defer(() -> { 
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create(); 
    Closeable c = modelTrainStrike.getCompaniesFromServer(v -> { 
     async.onNext(v); 
     async.onComplete(); 
    }); 
    return async.doOnUnsubscribe(() -> { 
     try { c.close(); } catch (IOException ex) { } 
    }); 
}) 
+0

伟大的作品,谢谢:) –