2017-08-02 29 views
1

我想用这样的代码(斯卡拉)登录一个RDD每个mapPartition操作的执行时间:阿帕奇星火mapPartition奇怪的行为(懒惰的评价是什么?)

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    } 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result 
} 

的问题是,它在开始执行映射操作之前立即记录“分区时间”,所以我总是获得2毫秒的时间。

我注意到了Spark Web UI,在日志文件中,有关执行时间的行在任务开始后立即出现,而不是如预期的那样结束。

有人能解释我为什么?在mapPartitions中,代码应该线性执行,否则我错了?

由于

问候 卢卡

+0

转换被懒惰地评估。 – philantrovert

+0

好的,谢谢! 我解决了在结束时间之前放置“result.size”。 我认为默认情况下,mapPartitions中的地图是一个Scala操作,不是懒惰的。 – Gaglia88

+0

@philantrovert不,这不是原因,map里面的mapPartitions不是Spark转换,这是纯粹的scala相关 –

回答

3

partitionsmapPartitions内部是一个Iterator[Row]Iterator是Scala懒惰地评估(即,当迭代器被消耗)。这与Spark的懒惰评论无关!

调用partitions.size将触发评估您的映射,但会消耗迭代器(因为它只能迭代一次)。一个例子

val it = Iterator(1,2,3) 
it.size // 3 
it.isEmpty // true 

你能做什么是迭代器转换为无延迟的集合类型:

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    }.toVector // now the statements are evaluated 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result.toIterator 
} 

编辑:请注意,您可以使用System.currentTimeMillis()(甚至System.nanoTime())而不是使用Calendar