我已经使用RxJava(聚合服务)编写了一个Spring Boot微服务来实现以下简化用例。大的情况是,当教师上传课程内容文档时,应该生成并保存一组问题。使用RxJava并行创建对象
- 用户上传文件到系统。
- 系统调用文档服务将文档转换为文本。
- 然后它会调用另一个生成服务的问题来生成给定上述文本内容的一组问题。
- 最后,将这些问题发布到基本的CRUD微服务中进行保存。
当用户上传文档时,会产生很多问题(可能会有几百个问题)。这里的问题是我按顺序逐个发布问题,以便CRUD服务保存它们。这会由于IO密集型网络调用而急剧减慢运行速度,因此完成整个过程大约需要20秒。这里是目前的代码,假设所有的问题都是公式化的。
questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();
private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
return Observable.<QuestionDTO> create(sub -> {
QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
sub.onNext(questionCreated);
sub.onCompleted();
}).doOnNext(s -> log.debug("Question was created successfully."))
.doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}
现在我的要求是将所有问题并行发布到CRUD服务,并在完成时合并结果。另请注意,CRUD服务一次只能接受一个问题对象,且不能更改。我知道我可以使用Observable.zip
运算符来达到这个目的,但是我不知道如何在这种情况下应用它,因为问题的实际数量不是预先确定的。如何更改第1行中的代码,以便我可以提高应用程序的性能。任何帮助表示赞赏。
http://stackoverflow.com/a/42823151/7045114 –
不要使用create,而应该使用fromCallable/defer。如果您使用create操作符的方式不是预期的,那么您将终止背压(rx1) –