2016-02-23 47 views
2

最近我一直试图从Kinesis获取Spark读取事件,但在接收事件时遇到问题。虽然Spark能够连接到Kinesis并能够从Kinesis获取元数据,但无法从中获取事件。它总是取回零元素。Spark无法从Amazon Kinesis获取事件

没有错误,只是空的结果回来。 Spark能够获取元数据(例如kinesis等中的碎片数量)。

我已经使用这些[1 & 2]指南获得它的工作,但还没有得到太多的运气。我也尝试了几个来自SO [3]的建议。群集有足够的资源/核心可用。

我们已经看到Spark和Kinesis之间Protobuf版本中的版本冲突,这也可能是导致此行为的原因。 Spark使用protobuf-java版本2.5.0,kinesis可能使用protobuf-java-2.6.1.jar。

只是想知道是否有人遇到过这种行为,或者已经得到了与kinesis一起工作的火花。

已经试过用Spark 1.5.0,Spark 1.6.0。

  1. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
  2. https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

  3. Apache Spark Kinesis Sample not working

回答

1

回答我的问题 -

我有一些成功的星火室壁运动集成和键为unionStreams .foreachRDD。

有可用的foreachRDD

  • unionStreams.foreachRDD
  • unionStreams.foreachRDD((RDD的2个版本:RDD [数组[字节]],时间:时间)

对于某种原因,第一个不能得到我的结果,但改变到第二个取回我的结果如预期。但要探究其原因。

添加代码snipp以下供参考。

也考虑改变这一点。这让我作为良好

"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work 
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works 

希望它可以帮助别人:)

谢谢大家的帮助。

val kinesisStreams = (0 until numStreams).map { 
    count => 
    val stream = KinesisUtils.createStream(
     ssc, 
     consumerName, 
     streamName, 
     endpointUrl, 
     regionName, 
     InitialPositionInStream.TRIM_HORIZON, 
     kinesisCheckpointInterval, 
     StorageLevel.MEMORY_AND_DISK_2 
    ) 

    stream 
} 
val unionStreams = ssc.union(kinesisStreams) 

println(s"========================") 
println(s"Num of streams: ${numStreams}") 
println(s"========================") 

/*unionStreams.foreachRDD{ // Doesn't Work !! 
    rdd => 
    println(rdd.count) 
    println("rdd isempty:" + rdd.isEmpty) 
}*/ 
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !! 
    println(rdd.count) 
    println("rdd isempty:" + rdd.isEmpty) 
    } 
) 

ssc.start() 
ssc.awaitTermination() 
相关问题