2015-05-25 87 views
0

我有一个数组,它在封闭内(它有一些值),但在循环外,数组大小为0.我想知道是什么原因导致行为如此?数组火花关闭

我需要hArr可以在批处理HBase之外访问。

val hArr = new ArrayBuffer[Put]() 

rdd.foreach(row => { 
    val hConf = HBaseConfiguration.create() 
    val hTable = new HTable(hConf, tablename) 
    val hRow = new Put(Bytes.toBytes(row._1.toString)) 
    hRow.add(...) 
    hArr += hRow 
    println("hArr: " + hArr.toArray.mkString(",")) 
}) 

println("hArr.size: " + hArr.size) 
+0

我今天见过类似的东西http://stackoverflow.com/q/30437856/210905 – Odomontois

回答

0

问题是,rdd闭包中的任何项目都被复制并使用本地版本。 foreach只能用于保存到磁盘或沿着这些线路的东西。

如果你想要这个数组中,那么你可以map然后collect

rdd.map(row=> { 
    val hConf = HBaseConfiguration.create() 
    val hTable = new HTable(hConf, tablename) 
    val hRow = new Put(Bytes.toBytes(row._1.toString)) 
    hRow.add(...) 
    hRow 
}).collect() 
0

我发现了相当长的一段新的Spark用户感到困惑,他们是如何映射器和减速功能得到运行,如何与在驱动程序中定义的东西。一般来说,您所定义并通过map或foreach或reduceByKey或许多其他变体进行了注册的mapper/reducer功能都不会在您的驱动程序中执行。在您的驱动程序中,您只需将它们注册为Spark即可远程分布式运行它们。当这些函数引用您在驱动程序中实例化的某些对象时,您实际上创建了一个“Closure”,它会在大多数时间编译OK。但通常情况下,这不是你想要的,你会在运行时遇到问题,通过查看NotSerializable或ClassNotFound异常。

您可以通过foreach()变体远程执行所有输出工作,也可以尝试通过调用collect()将所有数据回收到驱动程序中输出。但是请注意collect(),因为它会将来自分布式节点的所有数据收集到您的驱动程序中。只有当你确定你的最终汇总数据很小时,你才会这样做。