1
[我们正在测试一个驱动程序,它可以在优化时提供出色的并行性。诀窍是,它不会在Spark分区内并行(访问DB2),所以要求我们告诉它我们需要多少个并行线程,并且为每个线程引入一个查询。虽然我曾希望在一个DataFrame对象数组的循环中做到这一点,但我无法弄清楚如何用一组DataFrame对象编写一个scala。对于我做的蛮力测试:Scala/Spark数据框数组
val DF1 = sqlContext.read.format("jdbc"). ...yada yada
val DF2 = sqlContext.read.format("jdbc"). ...yada yada
val DF3 = sqlContext.read.format("jdbc"). ...yada yada
val DF4 = sqlContext.read.format("jdbc"). ...yada yada
val unionDF=(((DF1.unionAll(DF2)).unionAll(DF3)).unionAll(DF4))
这对于并行化到4个分区非常有效。我宁愿做一个循环,但然后它会出现我需要像这样:
var myDF = new Array [DataFrame](parallelBreakdown)...并且DataFrame不是类型。任何关于这样做的想法都不会使用暴力方法吗?谢谢,
_it不在Spark分区_中并行化(在访问DB2中) - 为什么不简单地增加分区数?你想要的只是一个关于Scala集合标准操作的问题,但看起来像XY问题。 – zero323
一个循环...你在说什么像'Seq(DF1,DF2,DF3,DF4).reduce(_.unionAll(_))'? –
首先,感谢您的回复。我需要单独执行这些操作的原因是我正在测试一个新的驱动程序,该驱动程序适用于其自身的并行化形式。但是,与Spark不同,我可以指定分区,下限,yada yada,我需要识别#..并为每个分区提交一个单独的查询。我的想法是,定义一个DataFrame对象的数组然后通过1到NumPartition的范围来驱动一个循环会很好。然后unionAll数组的每个元素。当我在循环中写入同一个DF对象时,它有点奏效,但它并没有并行化。再次感谢 –