2016-09-12 82 views
3

在我的应用程序中,我有一项服务跟踪用户位置,然后使用RxJava将其发送到服务器。如果请求成功,我收到插入的ID,然后我可以从我的本地数据库中删除它们。使RxJava异步任务线程安全

  1. 查询了点数据库发送
  2. 如果不是空的,我张贴从数据库
  3. 所有的聚集点。如果请求成功,我从数据库中删除张贴的点

我的问题是,我在前一个任务结束前快速调用该observable,以便服务器接收重复点(两个请求都是相同的点)。我需要在单线程上执行Observable以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个Looper线程,但仍然,我发送重复,我不知道为什么。服务器请求现在似乎要等到它在执行下一个请求之前结束,但仍然在下一个请求中发送相同的点! Gahh

final StoreChangeEvent finalEvent = event; 
    Observable 
      .defer(() -> Observable.just(database.getAllPoints())) 
      .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
        .map(result -> deletePoint(result)) 
        .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
        .doOnCompleted(() -> emitStoreChange(finalEvent)) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribeOn(AndroidSchedulers.from(backgroundLooper))) 
      .subscribe(); 

好像database.getAllPoints()被称为太快......我应该添加.blocking()?

比方说,我有5点发布到服务器(A,B,C,d

  1. 我查询数据库和发送ABCD服务器
  2. 我reveive另一点从该装置(点Ë
  3. 我查询数据库和发送(ABCDE
  4. 我成功接收来自服务器,然后我从L删除ABCD OCAL DB
  5. 我收到第二请求成功,那么我从本地DB

结果删除ABCDE:ABCD是有两倍的服务器数据库上因为有两个请求具有相同点

+1

不太清楚你的意思。你可能会多次订阅,并希望确定它们随后被执行? –

+0

是的,只有在前一个Observable完成时才发送点,然后才查询数据库...我在问题底部添加了详细信息 – Jaythaking

回答

2

选项1 - 除

您可以尝试使用Subject - 这既充当观察者和观察到的事情。

订阅它,并通知其他地方的新活动(onNext())。用户将随后处理事件。

我用SerializedSubject万一你会从不同的线程调用notifyNewEvent(),否则你可以使用BehaviourSubject

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create()) 

public void initialize() { 
    // Since you access your incoming event from doOnCompleted, 
    // need this extra flatMap function so that you can access your event 
    // outside rx java chain. 
    subject.flatMap(new Func1() { 
     @Override 
     public Observable call(StoreChangeEvent event) { 
      return Observable 
        .just(database.getAllPoints()) 
        .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
          .map(result -> deletePoint(result)) 
          .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
          .doOnCompleted(() -> emitStoreChange(finalEvent))); 
     } 
    }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeOn(AndroidSchedulers.from(backgroundLooper)) 
      .subscribe(); 
} 

public void notifyNewEvent(StoreChangeEvent event) { 
    subject.onNext(event); 
} 

选择2 - 执行人

如果你不访问UI,为什么还要用科目和observeOn,subscribeOn打扰。用一个线程创建一个执行程序(随后执行所有任务),并将任务提交给它,从而充分利用RxJava的有用功能。

ExecutorService executorService = Executors.newSingleThreadExecutor(); 

public void notifyNewEvent(final StoreChangeEvent event) { 
    executorService.execute(new Runnable() { 
     public void run() { 
      Observable.just(database.getAllPoints()) 
       // Blocking, so that the thread doesn't exit 
       // and blocks on subscribe() till completion. 
       .toBlocking() 
       .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
        .map(result -> deletePoint(result)) 
        .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
        .doOnCompleted(() -> emitStoreChange(finalEvent))) 
       .subscribe(); 
     } 
    }); 
} 
+0

传入事件不是来自.doOnComplete()。它来自我有一个调度员班,当从GPS获取一个定位点时。对不起,这并不清楚我的代码 – Jaythaking

+0

@Jaythaking当然,但你可以在'doOnComplete'中访问它。没有保存在某个地方,你无法访问它,因为在链中已经丢失(转换)。 –

+0

我有点迷失在这里我不得不承认......我尝试了两种都失败了,我必须在请求完成后通知视图,从而访问UI ...我使用StoreEventChange通知我的视图体系结构 – Jaythaking