2015-04-28 66 views
4

我想用rxJava来实现这个工作流程,但是我确定如果我滥用或做错了什么东西。用RXJava处理缓存

  • 用户要求登录
  • 如果loginResult在高速缓存中可用,那么“发出”缓冲LoginResult
  • 否则实际执行请求的web服务和缓存结果,如果一切全成
  • 如果最多发生3次重试错误,如果有第4次,则清除缓存。

这是我完整的代码片段。

public class LoginTask extends BaseBackground<LoginResult> { 
    private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); 
    private XMLRPCClient xmlrpcClient; 
    private UserCredentialsHolder userCredentialsHolder; 

    @Inject 
    public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { 
    this.xmlrpcClient = client; 
    this.userCredentialsHolder = userCredentialsHolder; 
    } 

    @Override 
    public LoginResult performRequest() throws Exception { 
    return UserApi.login(
     xmlrpcClient, 
     userCredentialsHolder.getUserName(), 
     userCredentialsHolder.getPlainPassword()); 


    } 

    @Override 
    public Observable<LoginResult> getObservable() { 
    return cachedLoginResult.getObservable() 
     .onErrorResumeNext(
      Observable.create(
       ((Observable.OnSubscribe<LoginResult>) subscriber -> { 
        try { 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(performRequest()); // actually performRequest 
        } 
        subscriber.onCompleted(); 
        } catch (Exception e) { 
        subscriber.onError(e); 
        } 
       }) 
      ) 
       .doOnNext(cachedLoginResult::setLoginResult) 
       .retry((attempts, t) -> attempts < 3) 
       .doOnError(throwable -> cachedLoginResult.purgeCache()) 
     ); 
    } 


    private static class CachedLoginResult { 
    private LoginResult lr = null; 
    private long when = 0; 

    private CachedLoginResult() { 
    } 

    public boolean hasCache() { 
     return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis(); 
    } 

    public void setLoginResult(LoginResult lr) { 
     if (lr != null) { 
      this.lr = lr; 
      this.when = System.currentTimeMillis(); 
     } 
    } 

    public void purgeCache() { 
     this.lr = null; 
     this.when = 0; 
    } 

    public Observable<LoginResult> getObservable() { 
     return Observable.create(new Observable.OnSubscribe<LoginResult>() { 
     @Override 
     public void call(Subscriber<? super LoginResult> subscriber) { 
      if (!subscriber.isUnsubscribed()) { 
      if (hasCache()) { 
       subscriber.onNext(lr); 
       subscriber.onCompleted(); 
      } else { 
       subscriber.onError(new RuntimeException("No cache")); 
      } 
      } 
     } 
     }); 
    } 
    } 
} 

因为我wan't能找到任何类似的例子,我开始“耍” rxjava仅有1天前我不能确定我的执行。

谢谢你的时间。

回答

2

我觉得这个代码是好的,好工作:)

你是使用权Observable.createLoginTask,否则造成的通话可能被内部缓存,然后retry将没有太大的帮助......

这是我认为无论如何是CachedLoginResultObservable。在这里,您可以通过使用Observable.justObservable.error实用方法,像简化代码:

public Observable<LoginResult> getObservable() { 
    if (hasCache()) { 
     return Observable.just(lr); 
    } else { 
     return Observable.error(new RuntimeException("No cache")); 
    } 
} 

注:just商店,你告诉它的价值在内部散发,使resubscriptions总是会产生这个值。这就是我上面所暗示的,例如你不应该这样做Observable.just(performRequest()).retry(3),因为performRequest只会被调用一次。

+0

您好西蒙,正确的,如果“错了,但是用你。只是和.error会使数值在观察到创建发射。如果我创建主观察值并在缓存过期后使用它,会发生什么?我想它会让我回到过去30分钟的旧缓存,现在应该过期了吗? –

+0

你是对的,只是在你调用getObservable()的时候捕获缓存的值,所以'Observable.create'或'Observable.defer'实际上是有意义的。也看看Akarnokd的答案;) –

0

如果我理解正确,您想要执行一次登录并以反应方式缓存结果?如果是这样,这里是一个例子,我会怎么做:

import java.util.concurrent.ThreadLocalRandom; 

import rx.*; 
import rx.schedulers.Schedulers; 
import rx.subjects.AsyncSubject; 


public class CachingLogin { 
    static class LoginResult { 

    } 
    /** Guarded by this. */ 
    AsyncSubject<LoginResult> cache; 
    public Observable<LoginResult> login(String username, String password) { 
     AsyncSubject<LoginResult> c; 
     boolean doLogin = false; 
     synchronized (this) { 
      if (cache == null || cache.hasThrowable()) { 
       cache = AsyncSubject.create(); 
       doLogin = true; 
      } 
      c = cache; 
     } 
     if (doLogin) { 
      Observable.just(1).subscribeOn(Schedulers.io()) 
      .map(v -> loginAPI(username, password)) 
      .retry(3).subscribe(c); 
     } 
     return c; 
    } 
    public void purgeCache() { 
     synchronized (this) { 
      cache = null; 
     } 
    } 
    static LoginResult loginAPI(String username, String password) { 
     if (ThreadLocalRandom.current().nextDouble() < 0.3) { 
      throw new RuntimeException("Failed"); 
     } 
     return new LoginResult(); 
    } 
} 
+0

你好akarnokd,谢谢你的回答。它看起来像在rxjava中做事情的好方法:)如果我在调用“登录”方法时正确理解它,它将立即触发loginAPI()(如果缓存不存在)。虽然这在大多数情况下可能是有意义的,但我希望获得支持真正http调用的observable,以便我可以根据用例使用observeOn和subscribeOn。 –