2015-10-31 34 views
8

我在Android应用使用RetrofitRxJava,和我的代码:改造默认线程

public void getConfig(NetworkSubscriber subscriber) { 
    Observable<Config> observable = mApi.getConfig(); 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 
} 

public void getCode(String mobile, int type, NetworkSubscriber subscriber) { 
    Observable<BaseMessageEntity> observable = mApi.getCode(mobile, type); 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 
} 

而且我不想写.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())每一个业务方法

我该怎么办?

回答

17

如果您不想在每次调用时指定所需的线程,可以创建一个围绕RxJavaCallAdapterFactory的包装,以便为您的线程默认设置。

public class RxThreadCallAdapter extends CallAdapter.Factory { 

    RxJavaCallAdapterFactory rxFactory = RxJavaCallAdapterFactory.create(); 
    private Scheduler subscribeScheduler; 
    private Scheduler observerScheduler; 

    public RxThreadCallAdapter(Scheduler subscribeScheduler, Scheduler observerScheduler) { 
     this.subscribeScheduler = subscribeScheduler; 
     this.observerScheduler = observerScheduler; 
    } 

    @Override 
    public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { 
     CallAdapter<Observable<?>> callAdapter = (CallAdapter<Observable<?>>) rxFactory.get(returnType, annotations, retrofit); 
     return callAdapter != null ? new ThreadCallAdapter(callAdapter) : null; 
    } 

    final class ThreadCallAdapter implements CallAdapter<Observable<?>> { 
     CallAdapter<Observable<?>> delegateAdapter; 

     ThreadCallAdapter(CallAdapter<Observable<?>> delegateAdapter) { 
      this.delegateAdapter = delegateAdapter; 
     } 

     @Override public Type responseType() { 
      return delegateAdapter.responseType(); 
     } 

     @Override 
     public <T> Observable<?> adapt(Call<T> call) { 
      return delegateAdapter.adapt(call).subscribeOn(subscribeScheduler) 
       .observeOn(observerScheduler); 
     } 
    } 
} 

,然后使用它,而不是在你的建设者RxJavaCallAdapterFactory.create() -

Retrofit retrofit = new Retrofit.Builder() 
    .baseUrl("https://api.github.com/") 
    .addConverterFactory(GsonConverterFactory.create()) 
    .addCallAdapterFactory(new RxThreadCallAdapter(Schedulers.io(), AndroidSchedulers.mainThread())) 
    .build(); 
+0

认为兄弟!你有我! – xuyanjun

+0

这个错误如果你在线程内部调用(IE自定义访问管理)'''return delegateAdapter.responseType();'''会返回空指针。 –

6

您可以使用compose()将其减少到一行。例如,下面是您的getConfig()方法的修改版本。它假定你正在使用retrolambda。

public void getConfig(NetworkSubscriber subscriber) { 
    Observable<Config> observable = mApi.getConfig(); 
    observable 
      .compose(this::setupThreads) 
      .subscribe(subscriber); 
} 

setupThreads()方法是这样的:

private <T> Observable<T> setupThreads(final Observable<T> observable) { 
    return observable 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

一些参考:

+0

谢谢,我明白了。但是,那不是由rx自己来解决它的方法吗? – xuyanjun

+0

@xuyanjun - 'compose()'是RxJava的一个重要组成部分,所以我不太清楚你的意思是“由rx self解决它”。在这种情况下,'compose()'将允许您重复使用'setupThreads()'而不必添加'.subscribeOn(Schedulers.newThread())。observeOn(AndroidSchedulers.mainThread())'到每个observable 。 – kjones

6

要对subscribeOn默认调度程序,你可以创建Retrofit例如,当把它作为一个参数直奔RxJavaCallAdapterFactory

new Retrofit.Builder() 
      .client(okHttpClient) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())) 
      .build(); 

得到了在改造推出2.0.0