在我的应用程序中,我有一项服务跟踪用户位置,然后使用RxJava
将其发送到服务器。如果请求成功,我收到插入的ID,然后我可以从我的本地数据库中删除它们。使RxJava异步任务线程安全
- 查询了点数据库发送
- 如果不是空的,我张贴从数据库
- 所有的聚集点。如果请求成功,我从数据库中删除张贴的点
我的问题是,我在前一个任务结束前快速调用该observable,以便服务器接收重复点(两个请求都是相同的点)。我需要在单线程上执行Observable
以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个Looper
线程,但仍然,我发送重复,我不知道为什么。服务器请求现在似乎要等到它在执行下一个请求之前结束,但仍然在下一个请求中发送相同的点! Gahh
final StoreChangeEvent finalEvent = event;
Observable
.defer(() -> Observable.just(database.getAllPoints()))
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper)))
.subscribe();
好像database.getAllPoints()
被称为太快......我应该添加.blocking()?
比方说,我有5点发布到服务器(A,B,C,d)
- 我查询数据库和发送ABCD服务器
- 我reveive另一点从该装置(点Ë)
- 我查询数据库和发送(ABCDE)
- 我成功接收来自服务器,然后我从L删除ABCD OCAL DB
- 我收到第二请求成功,那么我从本地DB
结果删除ABCDE:ABCD是有两倍的服务器数据库上因为有两个请求具有相同点发
不太清楚你的意思。你可能会多次订阅,并希望确定它们随后被执行? –
是的,只有在前一个Observable完成时才发送点,然后才查询数据库...我在问题底部添加了详细信息 – Jaythaking