2015-11-25 90 views
0

我试图通过聚集星火SQL CSV文件,然后显示结果为JSON:阅读CSV作为数据帧,并转换为JSON字符串

val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv") 
people.registerTempTable("people") 
val result = sqlContext.sql("select country, count(*) as cnt from people group by country") 

这就是我卡住了。我可以到result.schema().prettyJson()完美地工作,但我没有找到一种方法将result作为JSON返回。

我假设result.toJSON.collect()应该做我的愿望,但失败了

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException 
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:171) 
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

错误。有人可以指导我吗?

回答

1

原来这个错误是因为一个“格式错误“的CSV文件。它包含一些行比其他列(没有头字段名称)...更奇怪的错误消息。

+0

你应该把它标记为已接受。 – zero323

+0

事实上,只有两天后才能接受自己的回答。 – Tobi

+0

事实证明,即使文件格式正确,但您指定的模式没有足够的字段,也会发生这种情况。我刚刚遇到了一个文件,每行末尾都有一个额外的逗号,并通过在模式中添加一个额外的字符串字段来解决此问题。 –

1

你遇到的错误很奇怪,听起来像结果可能是空的?

你可能想在数据帧试试这个命令来获取每行打印出来,而不是:

result.toJSON.foreach(println) 

多一点信息,请参见Dataframe API

+0

谢谢您的回答。我正在寻找一种方法来接收完整的'Dataframe'作为单个JSON。如果我理解正确(Spark/Scala初学者在这里......) – Tobi

+0

这是肯定的,它输出什么?正如我所说,我认为根据你所得到的错误,你的结果可能是空的。 result.toJSON.collect()应该基于Spark people.json测试文件返回如下所示:Array [String] = Array({“name”:“Andy”,“cnt”:1},{“name” “Michael”,“cnt”:1},{“name”:“Justin”,“cnt”:1}) –

+0

我可以做'people.show()'而不是'result.show()' – Tobi

1

尝试

val people = sqlContext.read().format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("mode", "DROPMALFORMED") 
    .option("delimiter", ",") 
    .load("/tmp/people.csv") 
people.registerTempTable("people") 
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")