2016-03-02 34 views
5

我有一个重要的数据集,并且想要调用缓慢而干净的方法,并且比第一个调用结果有副作用的快速方法要快。我对中间结果不感兴趣,所以我不想收集它们。在并行流上调用顺序使所有以前的操作顺序

明显的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速调用。问题是,在单线程中执行的所有代码都没有实际的并行性。

示例代码:

@Test 
public void testParallelStream() throws ExecutionException, InterruptedException 
{ 
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); 
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() 
      .parallel() 
      .map(this::slowOperation) 
      .sequential() 
      .map(Function.identity())//some fast operation, but must be in single thread 
      .collect(Collectors.toSet()) 
    ).get(); 
    System.out.println(threads); 
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); 
} 

private String slowOperation(int value) 
{ 
    try 
    { 
     Thread.sleep(100); 
    } 
    catch (InterruptedException e) 
    { 
     e.printStackTrace(); 
    } 
    return Thread.currentThread().getName(); 
} 

如果删除sequential,代码执行如预期的,但,很明显,不平行的操作将在多个线程中调用。

您能否推荐一些有关此类行为的参考文献,或者可能采用某种方法来避免临时收集?

回答

5

切换从parallel()的流sequential()曾在初始流API的设计,但造成了许多问题,最终的落实是changed,所以它只是变成并行标志和关闭整个管道。当前文档确实是模糊的,但它是在Java-9改进:

流管道取决于在其上调用的终端操作的流的模式顺序地或并行地执行。可以使用BaseStream.isParallel()方法确定流的顺序或并行模式,并且可以使用BaseStream.sequential()BaseStream.parallel()操作修改流的模式。最新的顺序或并行模式设置适用于整个流管线的执行。

至于你的问题,你可以收集到一切中间List并开始新的顺序管道:

new Random().ints(100).boxed() 
     .parallel() 
     .map(this::slowOperation) 
     .collect(Collectors.toList()) 
     // Start new stream here 
     .stream() 
     .map(Function.identity())//some fast operation, but must be in single thread 
     .collect(Collectors.toSet()); 
+1

您引用的句子在Java 8版本中完全相同,可以在类文档的最后一个段落中找到。通常,您可以在[包文档](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps)中找到更多信息(请参阅“并行性” )而不是[特定方法](https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#parallel--),而不仅限于并行/顺序模式(例如与减少量相比)。 – Holger

+0

斑点!我知道它是[更新](http://hg.openjdk.java.net/jdk9/dev/jdk/rev/d52b2d49bf04)(我甚至参加了讨论并[相信](http://mail.openjdk。 java.net/pipermail/core-libs-dev/2015-August/034773.html)Stuart为'concat'添加一个特别的注释),但由于某种原因找到了错误的地方。发布编辑。 –

1

在当前实现中,Stream可以全部并行或全部依次。虽然Javadoc不明确这一点,它可能在未来发生变化,但它确实表示这是可能的。

S并联()

返回的等效流平行。可能会返回自己,因为流已经是平行的,或者因为基础流状态被修改为平行。

如果你需要的功能是单线程,我建议你使用锁定或同步块/方法。

+0

感谢您的回复,但同步方法成为瓶颈和中间收集工作更快(确认由江铃控股)。在这种特殊情况下,我对性能和内存更感兴趣。 – the20login