2016-09-09 42 views
2

我有一个来自Kafka的Spark消费者。 我正在尝试管理偏移量的精确一次语义。从RDD访问KafkaOffset时出现异常

然而,在访问抵消它抛出以下异常:

“java.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDD 不能转换为org.apache.spark.streaming .kafka.HasOffsetRanges”

,这是否是如下代码的一部分:

var offsetRanges = Array[OffsetRange]() 
dataStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .foreachRDD(rdd => { }) 

这里dataStrea

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2) 

如果有人能帮助我理解我在做什么错在这里:M是直接流(DSTREAM [字符串])使用像KafkaUtils API创建的东西。 转换是在官方文档中提到的在数据流上执行的方法链中的第一种方法以及

谢谢。

回答

6

您的问题是:

.map(._2) 

它创建了一个MapPartitionedDStream,而不是由KafkaUtils.createKafkaStream创建的DirectKafkaInputDStream

您需要maptransform后:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t)) 

kafkaStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .map(_._2) 
    .foreachRDD(rdd => // stuff) 
+0

AAH!傻我。 感谢队友。干杯! :) – taransaini43

+0

@ user1521672您欢迎。 –

+0

也尝试使用偏移量创建直接流,但遇到错误。
val fromOffsets :(TopicAndPartition,Long)= TopicAndPartition(metrics_rs.getString(1),metrics_rs.getInt(2)) - > metrics_rs.getLong(3)
KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder,(字符串,字符串)](SSC,kafkaParams,fromOffsets,的MessageHandler)
其中, VAL的MessageHandler = (MMD:MessageAndMetadata [字符串,字符串])=> mmd.message.length 和metrics_rs是结果集从该我正在获取偏移地图。它说太多类型参数错误 – taransaini43

相关问题