2017-07-13 105 views
0

我无法并行Scala中的一个列表,让显示java.lang.NullPointerException无法在斯卡拉

并行列表
messages.foreachRDD(rdd => { 
     for(avroLine <- rdd){ 
     val record = Injection.injection.invert(avroLine.getBytes).get 
     val field1Value = record.get("username") 
     val jsonStrings=Seq(record.toString()) 
     val newRow = sqlContext.sparkContext.parallelize(Seq(record.toString())) 
      } 
      }) 

输出

jsonStrings...List({"username": "user_118", "tweet": "tweet_218", "timestamp": 18}) 

异常

Caused by: java.lang.NullPointerException 
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:83) 
at com.capitalone.AvroConsumer$$anonfun$main$1$$anonfun$apply$1.apply(AvroConsumer.scala:74) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:26) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 

Thanks in Adv ANCE!

回答

0

您正试图在spark worker上下文中创建RDD。虽然foreachRDD在驱动程序中运行,但您在每个RDD上执行的操作foreach将分发给工作人员。您似乎不太可能想要为输入流的每一行创建一个新的RDD。评论后

更新:

很难有评论线程那里是代码没有格式的讨论。我的基本问题是,你为什么不这样做这样的事情:

val messages: ReceiverInputDStream[String] = RabbitMQUtils.createStream(ssc, rabbitParams) 
def toJsonString(message: String): String = SparkUtils.getRecordInjection(QUEUE_NAME).invert(message.getBytes()).get 
val jsonStrings: DStream[String] = messages map toJsonString 

我没有打扰找出并追查你使用的所有库(请,下一次,提交MCVE)所以我没有试图编译它。但它看起来像你想要的是将每个输入消息映射到一个JSON字符串。也许你想对Strings产生的DStream做一些幻想,但这可能是一个不同的问题。

+0

感谢您的回复。我正在尝试使用av​​ro invert转换我的RabbitMQ AVRO二进制流,然后将其保存为文件系统的.csv文件。 val messages = RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(RDD => { \t为(avroLine < - 消息){ VAL记录= SparkUtils.getRecordInjection(QUEUE_NAME).invert(rdd.getBytes)。获得; VAL jsonStrings:RDD [字符串] =皮下。 parallelize(Seq(record.toString())); val result = sqlContext.read.json(jsonStrings).toDF(); result.write.mode(“Append”)。csv(“/ Users/Documents/rabbitmq/consumer-out /“); }}) – Mg2729

+0

它看起来非常像您试图为每个输入记录生成一个输出记录。那是对的吗?是否有一个原因,你不能只使用'地图'这个翻译? –

+0

是的,我的消费者每15分钟运行一次并消耗所有流。此外,我尝试使用map函数,val消息= RabbitMQUtils.createStream(ssc,rabbitParams); messages.foreachRDD(rdd => {val record = rdd.map(message => SparkUtils.getRecordInjection(QUEUE_NAME).invert(message。 getBytes).get); val jsonStrings:RDD [String] = sqlContext.sparkContext.parallelize(Seq(record.toString()));但是,至少在我的prev过程中,我可以将我的二进制流转换为可读,但是我'获取下面的输出与地图。jsonStrings ... ParallelCollectionRDD [42]并行在AVROMqStreaming.scala:62 – Mg2729

0
def toJsonString(message: String): String = {val record = 

SparkUtils.getRecordInjection(QUEUE_NAME).invert(message.getBytes()).get } 
dStreams.foreachRDD(rdd => { 
val jsonStrings = rdd.map (stream =>toJsonString(stream)) 
val df = sqlContext.read.json(jsonStrings) 
df.write.mode("Append").csv("/Users/Documents/kafka-poc/consumer-out/def/")} 
+0

请指出解决您问题的部分代码。 – jwvh

+0

谢谢Joe为您提供的所有帮助。 – Mg2729