2016-03-03 98 views
1

[我们正在测试一个驱动程序,它可以在优化时提供出色的并行性。诀窍是,它不会在Spark分区内并行(访问DB2),所以要求我们告诉它我们需要多少个并行线程,并且为每个线程引入一个查询。虽然我曾希望在一个DataFrame对象数组的循环中做到这一点,但我无法弄清楚如何用一组DataFrame对象编写一个scala。对于我做的蛮力测试:Scala/Spark数据框数组

val DF1 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF2 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF3 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF4 = sqlContext.read.format("jdbc"). ...yada yada 

    val unionDF=(((DF1.unionAll(DF2)).unionAll(DF3)).unionAll(DF4)) 

这对于并行化到4个分区非常有效。我宁愿做一个循环,但然后它会出现我需要像这样:

var myDF = new Array [DataFrame](parallelBreakdown)...并且DataFrame不是类型。任何关于这样做的想法都不会使用暴力方法吗?谢谢,

+2

_it不在Spark分区_中并行化(在访问DB2中) - 为什么不简单地增加分区数?你想要的只是一个关于Scala集合标准操作的问题,但看起来像XY问题。 – zero323

+0

一个循环...你在说什么像'Seq(DF1,DF2,DF3,DF4).reduce(_.unionAll(_))'? –

+0

首先,感谢您的回复。我需要单独执行这些操作的原因是我正在测试一个新的驱动程序,该驱动程序适用于其自身的并行化形式。但是,与Spark不同,我可以指定分区,下限,yada yada,我需要识别#..并为每个分区提交一个单独的查询。我的想法是,定义一个DataFrame对象的数组然后通过1到NumPartition的范围来驱动一个循环会很好。然后unionAll数组的每个元素。当我在循环中写入同一个DF对象时,它有点奏效,但它并没有并行化。再次感谢 –

回答

0

据帧确实是一个类型

import org.apache.spark.sql.DataFrame 

我能够定义一个函数

def querier(dim_vals: Array[String]): = { 
    dim_vals.flatMap(dim_val => 
     sql(MY_QUERY)) 
    } 

返回Array[DataFrame],我能够用Robert Congiu的回答创建一个单一的数据帧,并致电.show()就可以了。