2017-01-14 42 views
4

我已经看到了一些用于Java 8流API的takeWhile实现,但它们似乎都将流转换为非并行流。例如this之一:如何在Java 8中为Stream API实现并行支持takeWhile?

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) { 
    return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { 
    boolean stillGoing = true; 
    @Override public boolean tryAdvance(Consumer<? super T> consumer) { 
     if (stillGoing) { 
     boolean hadNext = splitr.tryAdvance(elem -> { 
      if (predicate.test(elem)) { 
      consumer.accept(elem); 
      } else { 
      stillGoing = false; 
      } 
     }); 
     return hadNext && stillGoing; 
     } 
     return false; 
    } 
    }; 
} 

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); 
} 

这里StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);转动传递给takeWhile成连续流的流。是否有人知道支持并行流的实现,或者我如何修改此代码以使其维护/支持并行流?

+3

你不能,真的。对不起,但你必须解决这个问题。这实际上是一种固有的顺序操作。你可以使用默认的非常有限的并行性,这对所有的东西都适用,这就是你在流中使用'.parallel'所得到的结果,但是你可以得到。 –

+0

为了在这里提取任何真正的并行性,谓词必须非常昂贵(例如,试图分解非常大的数字)。这不是不可能的,但它不太可能。 –

回答

2

如果源被称为是无序的,那么下面的实现应该工作:

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(UnorderedTakeWhileSpliterator<>(stream.spliterator(), predicate), stream.isParallel()); 
} 

有序实施将作为更棘手:

static final class UnorderedTakeWhileSpliterator<T> implements Spliterator<T>, Consumer<T>, Cloneable { 
    private final Predicate<? super T> predicate; 
    private final AtomicBoolean checked = new AtomicBoolean(); 
    private Spliterator<T> source; 
    private T cur; 

    UnorderedTakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) { 
     this.predicate = predicate; 
     this.source = source; 
    } 

    @Override 
    public void accept(T t) { 
     this.cur = t; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super T> action) { 
     if (!checked.get() && source.tryAdvance(this)) { 
      if (predicate.test(cur)) { 
       action.accept(cur); 
       return true; 
      } else { 
       checked.set(true); 
      } 
     } 
     return false; 
    } 

    @Override 
    public Spliterator<T> trySplit() { 
     Spliterator<T> prefix = source.trySplit(); 
     if(prefix == null) { 
      return null; 
     } 
     if(checked.get()) { 
      return Spliterators.emptySpliterator(); 
     } 
     UnorderedTakeWhileSpliterator<T> clone; 
     try { 
      clone = (UnorderedTakeWhileSpliterator<T>) clone(); 
     } catch (CloneNotSupportedException e) { 
      throw new InternalError(e); 
     } 
     clone.source = prefix; 
     return clone; 
    } 

    @Override 
    public long estimateSize() { 
     return source.estimateSize(); 
    } 

    @Override 
    public int characteristics() { 
     return source.characteristics() & (DISTINCT | SORTED | NONNULL); 
    } 

    @Override 
    public Comparator<? super T> getComparator() { 
     return source.getComparator(); 
    } 
} 

以下方法创建流它应该缓冲非前缀项并将取消传播到后缀。类似这样的东西在JDK-9中实现(不是作为分割器,而是作为普通的流操作),但我怀疑即使是这种棘手的实现在很多情况下都胜过顺序流。

+0

谢谢,但我似乎没有名为UnorderedTDOfRef的类?我在哪里可以找到这个? – Johan

+1

@Johan,修好了,对不起。 –

+0

它的工作原理:)脱离主题,但这是什么,你将包含在StreamEx库?那将是真棒! – Johan