2017-08-08 215 views
0

我想转换下面的代码运行在火花1.6上,但在我面临的某些问题。而sparksession转换为背景火花2到火花1.6

object TestData { 
    def makeIntegerDf(spark: SparkSession, numbers: Seq[Int]): DataFrame = 
    spark.createDataFrame(
     spark.sparkContext.makeRDD(numbers.map(Row(_))), 
     StructType(List(StructField("column", IntegerType, nullable = false))) 
    ) 

} 

如何转换它,使其在火花塞1.6

回答

2

SparkSession支持从spark 2.0仅限于向内。所以如果你想使用spark 1.6那么你需要在驱动类中创建SparkContextsqlContext并将它们传递给函数。

这样你就可以创建

val conf = new SparkConf().setAppName("simple") 
val sparkContext = new SparkContext(conf) 
val sqlContext = new SQLContext(sparkContext) 

,然后调用功能

val callFunction = makeIntegerDf(sparkContext, sqlContext, numbers) 

而且你的功能应该是为

def makeIntegerDf(sparkContext: SparkContext, sqlContext: SQLContext, numbers: Seq[Int]): DataFrame = 
    sqlContext.createDataFrame(
     sparkContext.makeRDD(numbers.map(Row(_))), 
     StructType(List(StructField("column", IntegerType, nullable = false))) 
    ) 
1

这里唯一的主要区别运行是利用火花的是火花会话,而不是火花上下文。

所以,你会做这样的事情:

object TestData { 
    def makeIntegerDf(sc: SparkContext, sqlContext: SQLContext, numbers: Seq[Int]): DataFrame = 
    sqlContext.createDataFrame(
     sc.makeRDD(numbers.map(Row(_))), 
     StructType(List(StructField("column", IntegerType, nullable = false))) 
    ) 
} 

当然,你需要为它提供的功能产生火花背景下,而不是火花会议。

+1

我认为有必要sqlContext太多。 :)你不这么认为吗? –

+0

@Assaf我认为'spark.createDataFrame'应该是'sc.createDataFrame',我试过了,但没有解决。还有它有createDataFrame或rdd.toDf? – Freeman

+0

@Freeman你是对的,我错过了那一个。我更新了答案,您需要使用sqlContext。 –