我无法并行Scala中的一个列表,让显示java.lang.NullPointerException无法在斯卡拉
并行列表messages.foreachRDD(rdd => {
for(avroLine <- rdd){
val record = Injection.injection.invert(avroLine.getBytes).get
val field1Value = record.get("username")
val jsonStrings=Seq(record.toString())
val newRow = sqlContext.sparkContext.parallelize(Seq(record.toString()))
}
})
输出
jsonStrings...List({"username": "user_118", "tweet": "tweet_218", "timestamp": 18})
异常
Caused by: java.lang.NullPointerException
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:83)
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:26)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
Thanks in Adv ANCE!
感谢您的回复。我正在尝试使用avro invert转换我的RabbitMQ AVRO二进制流,然后将其保存为文件系统的.csv文件。 val messages = RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(RDD => { \t为(avroLine < - 消息){ VAL记录= SparkUtils.getRecordInjection(QUEUE_NAME).invert(rdd.getBytes)。获得; VAL jsonStrings:RDD [字符串] =皮下。 parallelize(Seq(record.toString())); val result = sqlContext.read.json(jsonStrings).toDF(); result.write.mode(“Append”)。csv(“/ Users/Documents/rabbitmq/consumer-out /“); }}) – Mg2729
它看起来非常像您试图为每个输入记录生成一个输出记录。那是对的吗?是否有一个原因,你不能只使用'地图'这个翻译? –
是的,我的消费者每15分钟运行一次并消耗所有流。此外,我尝试使用map函数,val消息= RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(rdd => {val record = rdd.map(message => SparkUtils.getRecordInjection(QUEUE_NAME).invert(message。 getBytes).get); val jsonStrings:RDD [String] = sqlContext.sparkContext.parallelize(Seq(record.toString()));但是,至少在我的prev过程中,我可以将我的二进制流转换为可读,但是我'获取下面的输出与地图。jsonStrings ... ParallelCollectionRDD [42]并行在AVROMqStreaming.scala:62 – Mg2729