2016-04-10 43 views
2

在我的应用程序中,我使用了几个提供表单元素(ID,值)的Streams。元素是由下面的类定义:如何在两个或多个Stream上执行外连接

static final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final T value; 

    Element(int id, T value) { 
     this.id = id; 
     this.value = value; 
    } 

    @Override 
    public int compareTo(Element o) { 
     return Long.compare(id, o.id); 
    } 
} 

我的目标是通过元素的ID来连接两个或多个流(每个流中的ID进行分类,并严格单调),例如:

Stream <Element> colour = Arrays.stream(new Element[]{new Element(1, "red"), new Element(2, "green"), new Element(4, "red"), new Element(6, "blue")}); 
    Stream <Element> length = Arrays.stream(new Element[]{new Element(2, 28), new Element(3, 9), new Element(4, 17), new Element(6, 11)}); 
    Stream <Element> mass = Arrays.stream(new Element[]{new Element(1, 87.9f), new Element(2, 21.0f), new Element(3, 107f)}); 

到包含形式(ID,[T1,T2,T3])的元素的单个流:通过应用这样一些方法

Stream<Element<Object[]>> allProps = joinStreams(colour, length, mass); 

public Stream<Element<Object[]>> joinStreams(Stream<Element>... streams) { 
    return ...; 
} 

得到的流应该提供一个FULL OUTER JOIN,即对于上面的例子:

1, "red", null, 87.9 
2, "green", 28, 21.0 
3, null, 9, 107 
4, "red" 17, null 
6, "blue", 11, null 

因为我用Java的流API的经验是很基本的,到目前为止我通常使用迭代器等任务。

是否有一种习惯(有效)的方式来执行这种Streams连接?有没有可以使用的实用程序库?

备注:该示例已简化。应用程序从类似于面向列的数据存储库(没有真正的DMBS)接收数据,这是几千兆字节的大小,并不容易放入内存。这种连接操作也没有内置的支持。

+0

'myElementsStream.collect(Collectors.groupingBy(e - > e.id))'? – fge

+0

我在这里有三个流 - 你如何定义myElementsStream? – Matthias

回答

1

要构建完整的外部连接流实现,我使用了两个阻塞队列。一个队列与每个流相关联,一个Filler类(一个Runnable实现)从流中读取数据并将其写入队列。当填充类用完数据时,它将一个流结束标记写入队列。然后我从AbstractSpliterator构造一个分割器。 tryAdvance方法实现从左边的队列和右边的队列中获取一个值,并根据比较结果消耗或保留这些值。我使用Element类的变体。请看下面的代码:

import java.util.ArrayList; 
import java.util.Collection; 

public final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final Collection<T> value; 

    public Element(int id, T value) { 
     this.id = id; 
     // Order preserving 
     this.value = new ArrayList<T>(); 
     this.value.add(value); 
    } 

    Element(long id, Element<T> e1, Element<T> e2) { 
     this.id = id; 
     this.value = new ArrayList<T>(); 
     add(e1); 
     add(e2); 
    } 

    private void add(Element<T> e1) { 
     if(e1 == null) { 
      this.value.add(null);   
     } else { 
      this.value.addAll(e1.value); 
     } 
    } 

    /** 
    * Used as End-of-Stream marker 
    */ 
    Element() { 
     id = -1; 
     value = null; 
    } 

    @Override 
    public int compareTo(Element<T> o) { 
     return Long.compare(id, o.id); 
    } 
} 

连接实现

import java.util.Comparator; 
import java.util.Spliterator; 
import java.util.Spliterators; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.function.Consumer; 
import java.util.stream.Stream; 
import java.util.stream.StreamSupport; 

public class OuterJoinSpliterator<T> extends Spliterators.AbstractSpliterator<Element<T>> { 

    private final class Filler implements Runnable { 
     private final Stream<Element<T>> stream; 
     private final BlockingQueue<Element<T>> queue; 

     private Filler(Stream<Element<T>> stream, BlockingQueue<Element<T>> queue) { 
      this.stream = stream; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      stream.forEach(x -> { 
       try { 
        queue.put(x); 
       } catch (final InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }); 
      try { 
       queue.put(EOS); 
      } catch (final InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public final Element<T> EOS = new Element<T>(); 
    private final int queueSize; 
    private final BlockingQueue<Element<T>> leftQueue; 
    private final BlockingQueue<Element<T>> rightQueue; 
    protected Element<T> leftValue; 
    protected Element<T> rightValue; 

    private OuterJoinSpliterator(long estSize, int characteristics, int queueSize, 
      Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     super(estSize, characteristics); 
     this.queueSize = queueSize; 
     leftQueue = createQueue(); 
     rightQueue = createQueue(); 
     createFillerThread(leftStream, leftQueue).start(); 
     createFillerThread(rightStream, rightQueue).start(); 
    } 

    private Element<T> acceptBoth(long id, Element<T> left, Element<T> right) { 
     return new Element<T>(id, left, right); 
    } 

    private final Element<T> acceptLeft(Element<T> left) { 
     return acceptBoth(left.id, left, null); 
    } 

    private final Element<T> acceptRight(Element<T> right) { 
     return acceptBoth(right.id, null, right); 
    } 

    private final Thread createFillerThread(Stream<Element<T>> leftStream, BlockingQueue<Element<T>> queue) { 
     return new Thread(new Filler(leftStream, queue)); 
    } 

    private final ArrayBlockingQueue<Element<T>> createQueue() { 
     return new ArrayBlockingQueue<>(queueSize); 
    } 

    @Override 
    public Comparator<? super Element<T>> getComparator() { 
     return null; 
    } 

    private final boolean isFinished() { 
     return leftValue == EOS && rightValue == EOS; 
    } 

    @Override 
    public final boolean tryAdvance(Consumer<? super Element<T>> action) { 
     try { 
      updateLeft(); 

      updateRight(); 

      if (isFinished()) { 
       return false; 
      } 

      if (leftValue == EOS) { 
       action.accept(acceptRight(rightValue)); 
       rightValue = null; 
      } else if (rightValue == EOS) { 
       action.accept(acceptLeft(leftValue)); 
       leftValue = null; 
      } else { 
       switch (leftValue.compareTo(rightValue)) { 
       case -1: 
        action.accept(acceptLeft(leftValue)); 
        leftValue = null; 
        break; 
       case 1: 
        action.accept(acceptRight(rightValue)); 
        rightValue = null; 
        break; 
       default: 
        action.accept(acceptBoth(leftValue.id, leftValue, rightValue)); 
        leftValue = null; 
        rightValue = null; 
       } 
      } 
     } catch (final InterruptedException e) { 
      return false; 
     } 
     return true; 
    } 

    private final void updateLeft() throws InterruptedException { 
     if (leftValue == null) { 
      leftValue = leftQueue.take(); 
     } 
    } 

    private final void updateRight() throws InterruptedException { 
     if (rightValue == null) { 
      rightValue = rightQueue.take(); 
     } 
    } 

    public static <T> Stream<Element<T>> join(long estSize, int characteristics, int queueSize, boolean parallel, Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     Spliterator<Element<T>> spliterator = new OuterJoinSpliterator<>(estSize, characteristics, queueSize, leftStream, rightStream); 
     return StreamSupport.stream(spliterator, parallel); 
    } 
} 

您可以使用Long.MAX_VALUE为您估计大小。请参阅Spliterator界面以获取各种流特性的说明。有关其他信息,请参阅AbstractSpliterator的注释。

-1

最简单的解决方案是编写迭代器,然后使用StreamSupport :: stream从迭代器创建流。但是如果您要使用并行流,则可以发现性能方面的一些问题。

相关问题