2017-03-06 20 views
0

我试图运行2个Dstreams,在第一个生成DataFrame中注册df作为tmp视图,然后在另一个Dstream中使用它,如下所示:spark streaming - 在一个流中创建tmp视图并在另一个流中使用

dstream1.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records = rdd.toDF("record") 
    records.createOrReplaceTempView("records") 
} 
dstream2.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records2 = rdd.toDF("record2") 
    val oldRecord = spark.table("records") 
    records2.join(oldRecod).write.json(...) 
} 
streamingContext.remember(Seconds(60)) 
    streamingContext.start() 
    streamingContext.awaitTermination() 

我一直在收到一个org.apache.spark.sql.catalyst.analysis.NoSuchTableException 所以显然我没有做正确的事情。

有没有办法做到这一点?

谢谢!

回答

0

这实际上工作, 问题是,当在本地进行测试时,您需要为计算留下额外的核心,然后从流中引入数据。

我使用了master = local [2],因此每个核心都用于处理每个流,而不用任何其他操作。 一旦我将其更改为master = local [4],它可以正常工作

相关问题