回答我的问题 -
我有一些成功的星火室壁运动集成和键为unionStreams .foreachRDD。
有可用的foreachRDD
- unionStreams.foreachRDD
- unionStreams.foreachRDD((RDD的2个版本:RDD [数组[字节]],时间:时间)
对于某种原因,第一个不能得到我的结果,但改变到第二个取回我的结果如预期。但要探究其原因。
添加代码snipp以下供参考。
也考虑改变这一点。这让我作为良好
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works
希望它可以帮助别人:)
谢谢大家的帮助。
val kinesisStreams = (0 until numStreams).map {
count =>
val stream = KinesisUtils.createStream(
ssc,
consumerName,
streamName,
endpointUrl,
regionName,
InitialPositionInStream.TRIM_HORIZON,
kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2
)
stream
}
val unionStreams = ssc.union(kinesisStreams)
println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")
/*unionStreams.foreachRDD{ // Doesn't Work !!
rdd =>
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}*/
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}
)
ssc.start()
ssc.awaitTermination()