2017-04-13 35 views
3

我已经使用RxJava(聚合服务)编写了一个Spring Boot微服务来实现以下简化用例。大的情况是,当教师上传课程内容文档时,应该生成并保存一组问题。使用RxJava并行创建对象

  • 用户上传文件到系统。
  • 系统调用文档服务将文档转换为文本。
  • 然后它会调用另一个生成服务的问题来生成给定上述文本内容的一组问题。
  • 最后,将这些问题发布到基本的CRUD微服务中进行保存。

当用户上传文档时,会产生很多问题(可能会有几百个问题)。这里的问题是我按顺序逐个发布问题,以便CRUD服务保存它们。这会由于IO密集型网络调用而急剧减慢运行速度,因此完成整个过程大约需要20秒。这里是目前的代码,假设所有的问题都是公式化的。

questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList(); 

private Observable<QuestionDTO> createQuestion(QuestionDTO question) { 
    return Observable.<QuestionDTO> create(sub -> { 
     QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API, 
       new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody(); 
     sub.onNext(questionCreated); 
     sub.onCompleted(); 
    }).doOnNext(s -> log.debug("Question was created successfully.")) 
      .doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage())); 
} 

现在我的要求是将所有问题并行发布到CRUD服务,并在完成时合并结果。另请注意,CRUD服务一次只能接受一个问题对象,且不能更改。我知道我可以使用Observable.zip运算符来达到这个目的,但是我不知道如何在这种情况下应用它,因为问题的实际数量不是预先确定的。如何更改第1行中的代码,以便我可以提高应用程序的性能。任何帮助表示赞赏。

+1

http://stackoverflow.com/a/42823151/7045114 –

+0

不要使用create,而应该使用fromCallable/defer。如果您使用create操作符的方式不是预期的,那么您将终止背压(rx1) –

回答

1

默认情况下,flatMap中的observalbes在您订阅它的同一个调度程序上运行。为了并行运行你的观察对象,你必须在计算调度器上订阅它们。

questions.flatMapIterable(list -> list) 
     .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation())) 
     .toList(); 

检查this article的完整说明。

+0

非常感谢您的回应。我在阅读那篇文章的同时尝试了一些例子,其中一些没有按预期工作。无论如何,我现在有另一个问题。 createQuestion是异步执行的。以前它是在一个线程中调用的。随着新的修改,它将在并行线程中调用。在每个问题的两种情况下,都会有一个单独的线索来处理封面背后的创作。由于这种变化会有巨大的收益吗?如果是这样如何? –

+0

@RavindraRanwala所以flatMap的工作就是为每个物品创建一个新的Observable,并将这些observable合并为一个。所以如果你的物品发射很快(这是你的列表),你的createQuestion工作真的很慢。然后你的createQuestion将以“并行”的方式工作。 –

+0

@PhoenixWang谢谢,我会试一试,让你发布性能增益。离开我的办公室需要几天的时间。 –