2016-12-28 15 views
0

我使用RxJava和concat()first()运营商:使用CONCAT()和第()来实现与RxJava高速缓存 - 并发呼叫

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .first(); 
} 

cachedEntities返回可观察到的从缓存列表同时内置networkEntities方法使用Retrofit来获取实体。

这很好用,除非两个用户快速订阅getEntities()返回的观察值。我猜想第一次订阅的网络请求在第二次订阅时没有完成。在这种情况下,执行两个网络请求。我想要避免。

我试图创建一个单独的线程调度所以第二个呼叫的执行只执行在第一次调用结束,但没有运气:

mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 

和:

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .subscribeOn(mSingleThreadScheduler) 
      .first(); 
} 

我试图在Observable链中调用subscribeOn调用较低,但我得到相同的结果。

任何提示?

+0

我相信你可以简单地使用您的方法的getEntities()'声明中的'synchronized'关键字,它应该等待以前的在允许另一个线程锁定之前调用完成。 –

+0

'concat()'调用没有被阻塞,所以'getEntities'几乎立即返回。在这种情况下,该方法中的'synchronized'将不起作用。 – fstephany

+0

但即使'concat()'没有被阻塞,无论是调用'getEntities()',在等待来自呼叫的返回时都会被阻塞吗?所以如果你试图阻止它被多次调用,你会认为'synchronized'对于这个调用是可以的。除非我错过了一些东西。如果它没有完成任务,它如何立即返回?也许我可以在这里收集珍闻。 –

回答

0

鉴于

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .first(); 
} 

您应该创建一个AsyncSubject<Data> mSubject,并以此为遵循

private Observable<List<Entity>> networkEntities() { 
    return mSubject 
      .map(Data::getEntities); 
} 

和网络电话应该是这样的

public Observable<Data> getDataFromNetwork() { 
    return networkOperation() 
      .subscribeOn(mSingleThreadScheduler) 
      .subscribe(mSubject); 
} 
1

我认为这是不是一个好主意,使一个方法线程安全。因为它阻止了整个方法,从而降低了性能。所以建议使数据结构是线程安全的。 在你的情况下,你使用的是在你的方法

public Observable<List<Entity>> getEntities() { 

}

使用的CopyOnWriteArrayList代替名单名单。它是线程安全的。

public Observable<CopyOnWriteArrayList<Entity>> getEntities() { 

}

希望它会工作。

+0

我已更新问题以反映问题的订阅性质,而不是方法调用。 – fstephany

0

这似乎是一个相对常见的多用户可观察用例。你需要像

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .first() 
      .replay(1)    
} 

查看答案this question为 深入解释了。

+0

看起来不错。一位同事也提到了“重放()”和“发布/主题”。我会深入研究它。 – fstephany