2014-10-20 100 views
4

有人可以给出在Java中正确使用mapPartitionsWithIndex的示例吗?我发现了很多Scala示例,但缺乏Java示例。 我的理解是正确的,使用此函数时,单独的分区将由单独的节点处理。Apache Spark mapPartitionsWithIndex

我收到以下错误

method mapPartitionsWithIndex in class JavaRDD<T> cannot be applied to given types; 
    JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex 
    required: Function2<Integer,Iterator<String>,Iterator<R>>,boolean 
    found: <anonymous Function2<Integer,Iterator<String>,Iterator<JavaRDD<String>>>> 

在做

JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex(
    new Function2<Integer, Iterator<String>, Iterator<JavaRDD<String>> >() { 

    @Override 
    public Iterator<JavaRDD<String>> call(Integer ind, String s) { 
+0

它不是没有清楚你所得到的。你有什么尝试?什么不起作用?与其他操作相同,Java API与Scala API直接类似。分区将由不同的任务处理,这些任务可能会或可能不在不同的计算机上,因为您的数据分区可能会或可能不会位于不同的计算机上。 – 2014-10-20 13:22:21

+0

好吧,我没有Scala知识,所以我很难读取Scala代码。我只需要将要传入mapPartitionsWithIndex的Function2实现的Java示例。我跟着唯一找到的例子,但有构建错误。 – 2014-10-20 14:06:30

+0

为什么不发布你正在问的错误呢? – 2014-10-20 14:43:31

回答

7

下面是我用它来删除一个CSV文件的第一行代码:

JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile); 

Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){ 
    @Override 
    public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception { 
     if(ind==0 && iterator.hasNext()){ 
      iterator.next(); 
      return iterator; 
     }else 
      return iterator; 
    } 
}; 
JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex(removeHeader, false); 
+0

太棒了!帮助我得到这一个正确的:[Apache Spark Function2,没有得到正确的声明](http://stackoverflow.com/q/38468390/3255525) – JimLohse 2016-07-19 22:40:38

+0

这将删除分区右侧的第一行,而不是在文件内? – AKC 2017-03-22 22:42:21

+0

对。只需删除活动分区中的行。 – 2017-03-23 07:17:16