2017-10-18 57 views
0

转型我有一个RDD(combinerRDD),关于这一点我在下面加改造结果在一个空RDD

JavaPairRDD<String, Integer> counts = combinerRDD.mapToPair(
      new PairFunction<Tuple2<LongWritable, Text>, String, Integer>() { 
       String filename; 
       Integer count; 
       Message message; 

       @Override 
       public Tuple2<String, Integer> call(Tuple2<LongWritable, Text> tuple) throws Exception { 
        xlhrCount = 0; 
        filename = ""; 

         filename = "New_File"; 
         for (JobStep js : message.getJobStep()) { 
          if (js.getStepName().equals(StepName.NEW_STEP)) { 
           count += 1; 
          } 
         } 

        return new Tuple2<String, Integer>(filename, xlhrCount); 
       } 
      }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
           @Override 
           public Integer call(Integer count1, Integer count2) throws Exception { 
            return (count1 + count2); 
           } 
          } 
    ); 

我的问题是,当combinerRDD里面有一些数据,我得到正确的结果。但当combinerRDD是空的写入HDFS的结果只是一个空的_SUCCESS文件。我期待2个文件在空RDD上转换,例如_SUCCESS和空白部分00000文件。我对吗?我应该得到多少个输出文件。

我之所以问这是因为我在2个集群中得到了不同的结果,在集群1上运行的代码导致了_SUCCESS文件,并且集群2导致了_SUCCESS和空的部分00000。我现在很困惑。结果是否依赖于任何群集设置?

注意:我在newRDD.leftOuterJoin(combinerRDD)上做了一个左连接,这没有给出结果(当combinerRDD只有_SUCCESS时)并且newRDD包含值。

回答

0

好的,所以我找到了解决方案。我正在使用spark-1.3.0,它有以下问题:ie。一个emptyRDD的左外连接会给出空结果。

https://issues.apache.org/jira/browse/SPARK-9236

我创建对空RDD象下面这样:

JavaRDD<Tuple2<LongWritable, Text>> emptyRDD = context.emptyRDD(); 
myRDD = JavaPairRDD.fromJavaRDD(emptyRDD); 

现在使用:

List<Tuple2<LongWritable, Text>> data = Arrays.asList(); 
JavaRDD<Tuple2<LongWritable, Text>> emptyRDD = context.parallelize(data); 
myRDD = JavaPairRDD.fromJavaRDD(emptyRDD); 

它现在,即我的RDD是没有更多的空。修正版本有: 1.3.2,1.4.2,1.5.0(参考上面的链接)。

相关问题