2015-06-02 132 views
5

我想分配我的输入的每一行id - 应该是从0N - 1的数字,其中N是输入中的行数。zipWithIndex Apache Flink

粗略地说,我希望能够做到像下面这样:

val data = sc.textFile(textFilePath, numPartitions) 
val rdd = data.map(line => process(line)) 
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) } 

但在Apache的弗林克。可能吗?

+0

这是一个有趣的问题。我会试着想出一个实现。 –

回答

6

这现在是Apache Flink 0.10-SNAPSHOT版本的一部分。 zipWithIndex(in)zipWithUniqueId(in)的示例可在官方Flink documentation中获得。

5

下面是一个简单的实现功能:

public class ZipWithIndex { 

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input"); 

    // count elements in each partition 
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() { 
     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception { 
      long cnt = 0; 
      for (String v : values) { 
       cnt++; 
      } 
      out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); 
     } 
    }); 

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() { 
     long start = 0; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts"); 
      Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() { 
       @Override 
       public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) { 
        return ZipWithIndex.compare(o1.f0, o2.f0); 
       } 
      }); 
      for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) { 
       start += offsets.get(i).f1; 
      } 
     } 

     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception { 
      for(String v: values) { 
       out.collect(new Tuple2<Long, String>(start++, v)); 
      } 
     } 
    }).withBroadcastSet(counts, "counts"); 
    result.print(); 

} 

public static int compare(int x, int y) { 
    return (x < y) ? -1 : ((x == y) ? 0 : 1); 
} 
} 

这是它如何工作的:我使用的是第一mapPartition()操作去了分区中的所有元素来算多少元素都在那里。 我需要知道每个分区中元素的数量,以便在将元素分配给元素时正确设置偏移量。 第一个mapPartition的结果是一个包含映射的DataSet。我将这个DataSet广播给所有第二个运算符,它们将ID分配给输入中的元素。 在第二个mapPartition()open()方法中,我正在计算每个分区的偏移量。

我可能会将代码贡献给Flink(与其他提交者讨论后)。

+0

谢谢罗伯特!你能否也许用几句话解释这是如何工作的?例如。为什么我们使用'getRuntimeContext()。getIndexOfThisSubtask()'和为什么每个分区的广播计数可以帮助? –

+0

好点。我会尽快添加一些说明。 –

+0

已添加描述 –