我有这样一段代码:试图了解火花流流
val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
ssc.start()
ssc.awaitTermination()
我的理解是,foreachRDD在驱动程序级别发生的呢?所以基本上所有的代码块:
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
发生在驱动程序级别?所述sparkStreamingService.run(DF)方法基本上没有对电流数据帧一些转换,以产生一个新的数据帧,然后调用存储数据帧到另一个卡桑德拉方法(在另一个罐)。 因此,如果这种情况都发生在驱动程序级别,我们没有利用火花执行程序,我怎样才能做到这一点,以便并行使用执行程序来并行处理RDD的每个分区
我的火花流服务运行方法:
var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
metadataDataframe.foreach(rowD => {
metaData = populateMetaDataService.populateSiteMetaData(rowD)
val headers = (rowD.getString(2).split(recordDelimiter)(0))
val fields = headers.split("\u0001").map(
fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)
val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
// val rawData = dataWithoutHeaders.split(recordDelimiter)
val rowRDD = rawData
.map(_.split("\u0001"))
.map(attributes => Row(attributes: _*))
val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
dataFrameFilterService.processBasedOnOpType(metaData, newDF)
})
但在这里:http://spark.apache.org/docs/latest/streaming-programming-guide.html,如果你向下滚动到他们使用forEachRdd的地方,他们有一个评论说,一个特定的声明正在以@Ahmed司机 – Ahmed
执行我编辑它直接与比文档 –
是,这样是我的问题增加了更多的明确性解决这一问题,就收集。到现在为止,由于我正在收集,记录是按顺序处理的,并且这些记录中的每一个都会分发给执行者?然后删除收集,所有的记录将被并行处理而不是顺序处理,是吗? – Ahmed