2013-07-31 76 views
21

我一直在使用Java 8中的CompletionStage/CompletableFuture进行异步处理,效果很好。然而,有时我想要一个阶段来执行迭代器/项目流的异步处理,而且似乎没有办法做到这一点。是否可以使用Java 8 Streams API进行异步处理?

具体来说,Stream.forEach()具有在调用所有项目之后的语义。我想同样的事情,但有CompletionStage代替,例如:

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

如果流是通过异步流结果的支持,这将是比等待它是在上面的代码本身完整的好得多。

是否有可能以某种方式用当前的Java 8 API来构造它?解决方法?

+0

嗨!我已经更新了我的答案。如果它会满足你,请投票,我试图赢得更多的声誉;) –

+2

@Rickard可能已经太晚了:)但我找到了你想要的东西,如果我正确理解你的话。 http://www.reactive-streams.org/。 https://github.com/reactor/reactor –

回答

20

据我所知,streams API不支持异步事件处理。听起来你想要的是类似于Reactive Extensions for .NET的东西,并且有一个名为RxJava的Java端口,由Netflix创建。

RxJava支持许多与Java 8流(如映射和过滤器)相同的高级操作,并且是异步的。

更新:现在有一个reactive streams主动权的作品,它看起来像JDK 9将包括它至少部分虽然Flow级的支持。

+19

的确,RxJava就是你想要的。 Streams的设计中心主要是关于无延迟访问(无论是来自数据结构还是生成函数)的数据; Rx的设计中心关于可能异步到达的无限事件流。 –

7

正如@KarolKrol所暗示的那样,您可以使用CompletableFuture流来完成。

有一个基于JDK8流构建的库,以便于使用名为cyclops-reactCompletableFuture流。

要组成你的流,你可以使用独眼巨人反应流畅的承诺ike API或者你可以使用简单反应的Stages

+0

我忘了现在投票:) –

+0

@KarolKról而不是游说upvotes,只是写出最好的答案来解决更多的问题。 –

2

cyclops-react(我是该库的作者),提供了一个用于处理Streams的类StreamUtils。 它提供的功能之一是futureOperations,它提供了对标准Stream终端操作(然后是一些)的访问,并带有扭曲 - Stream异步执行,结果在CompletableFuture内返回。 .eg

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

还有一个包装流,并提供相同的功能,具有很好的流利API一便于学习类ReactiveSeq

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

正如亚当指出cyclops-react FutureStreams被设计为异步处理数据(通过将Futures和Streams混合在一起) - 它特别适用于涉及阻塞I/O(如读取文件,进行db调用,进行其他调用等)的多线程操作。

1

可以生成一个流,将每个元素映射到CompletionStage,然后使用CompletionStage.thenCombine()收集结果,但是如果使用这样的简单代码,结果代码将不会更具可读性。

CompletionStage<Collection<Result>> collectionStage = 
     CompletableFuture.completedFuture(
      new LinkedList<>() 
     ); 

    for (Request request : requests) { 
     CompletionStage<Result> resultStage = performRequest(request); 
     collectionStage = collectionStage.thenCombine(
      resultStage, 
      (collection, result) -> { 
       collection.add(result); 
       return collection; 
      } 
     ); 
    } 

    return collectionStage; 

这个例子可以很容易地转换为功能forEach不失去可读性。但使用流的reducecollect需要额外的不太好的代码。

更新:CompletableFuture.allOfCompletableFuture.join提供了另一种更可读的方法,将未来结果的收集转换为未来的结果收集。

相关问题