2015-05-14 41 views
0

我试图从我们的客户之一使用spark-avro与谷歌分析avro数据文件。另外我是spark/scala的新手,所以如果我有什么不对或者做了什么愚蠢的行为,我很抱歉。我正在使用Spark 1.3.1。ArrayIndexOutOfBoundsException与Spark,Spark-Avro和Google Analytics数据

spark-shell --packages com.databricks:spark-avro_2.10:1.0.0 

然后我运行下面的命令:

import com.databricks.spark.avro._ 
import scala.collection.mutable._ 

val gadata = sqlContext.avroFile("[client]/data") 
gadata: org.apache.spark.sql.DataFrame = [visitorId: bigint, visitNumber: bigint, visitId: bigint, visitStartTime: bigint, date: string, totals: struct<visits:bigint,hits:bigint,pageviews:bigint,timeOnSite:bigint,bounces:bigint,tr ansactions:bigint,transactionRevenue:bigint,newVisits:bigint,screenviews:bigint,uniqueScreenviews:bigint,timeOnScre en:bigint,totalTransactionRevenue:bigint>, trafficSource: struct<referralPath:string,campaign:string,source:string, medium:string,keyword:string,adContent:string>, device: struct<browser:string,browserVersion:string,operatingSystem :string,operatingSystemVersion:string,isMobile:boolean,mobileDeviceBranding:string,flashVersion:string,javaEnabled: boolean,language:string,screenColors:string,screenResolution:string,deviceCategory:string>, geoNetwork: str... 

val gaIds = gadata.map(ga => ga.getString(11)).collect() 

我得到

我与火花壳这我踢了这样的实验数据出现以下错误:

[Stage 2:=>                       (8 + 4)/430]15/05/14 11:14:04 ERROR Executor: Exception in task 12.0 in stage 2.0 (TID 27) 
java.lang.ArrayIndexOutOfBoundsException 
15/05/14 11:14:04 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException 

15/05/14 11:14:04 ERROR TaskSetManager: Task 12 in stage 2.0 failed 1 times; aborting job 
15/05/14 11:14:04 WARN TaskSetManager: Lost task 11.0 in stage 2.0 (TID 26, localhost): TaskKilled (killed intentionally) 
15/05/14 11:14:04 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 25, localhost): TaskKilled (killed intentionally) 
15/05/14 11:14:04 WARN TaskSetManager: Lost task 9.0 in stage 2.0 (TID 24, localhost): TaskKilled (killed intentionally) 
15/05/14 11:14:04 WARN TaskSetManager: Lost task 13.0 in stage 2.0 (TID 28, localhost): TaskKilled (killed intentionally) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 2.0 failed 1 times, most recent failure: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

我虽然这可能是太过用我使用的索引,但下面的st工作可以。

scala> gadata.first().getString(11) 
res12: String = 29456309767885 

所以我尽管这也许一些记录可能为空或有不同数量的列...所以我试图运行下面的语句来获取所有的记录长度的列表:

scala> gadata.map(ga => ga.length).collect() 

,但我得到类似的错误:

[Stage 4:=>                       (8 + 4)/430]15/05/14 11:20:04 ERROR Executor: Exception in task 12.0 in stage 4.0 (TID 42) 
java.lang.ArrayIndexOutOfBoundsException 
15/05/14 11:20:04 WARN TaskSetManager: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException 

15/05/14 11:20:04 ERROR TaskSetManager: Task 12 in stage 4.0 failed 1 times; aborting job 
15/05/14 11:20:04 WARN TaskSetManager: Lost task 11.0 in stage 4.0 (TID 41, localhost): TaskKilled (killed intentionally) 
15/05/14 11:20:04 ERROR Executor: Exception in task 13.0 in stage 4.0 (TID 43) 
org.apache.spark.TaskKilledException 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
15/05/14 11:20:04 WARN TaskSetManager: Lost task 9.0 in stage 4.0 (TID 39, localhost): TaskKilled (killed intentionally) 
15/05/14 11:20:04 WARN TaskSetManager: Lost task 10.0 in stage 4.0 (TID 40, localhost): TaskKilled (killed intentionally) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 4.0 failed 1 times, most recent failure: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

这是与火花的Avro或Spark的问题吗?

回答

1

不确定潜在的问题是什么,但我已经设法通过将我的数据分解为每月集来解决错误。我在一个文件夹中存储了4个月的GA数据,并且正在处理所有数据。数据范围从70MB到150MB每天。

4月和1月份创建4个文件夹&并单独加载它们,地图成功,没有任何问题。一旦加载,我可以加入数据集(迄今为止只尝试过两种),并对它们进行处理,没有问题。

我在Pseudo Hadoop发行版上使用Spark,不确定这是否会影响Spark可以处理的数据量。

更新:

发现错误的根本问题。我加载了每个月的数据并打印出模式。无论一月和二月是相同的,但在此之后一个领域去步行约在三月和Aprils模式:

root 
|-- visitorId: long (nullable = true) 
|-- visitNumber: long (nullable = true) 
|-- visitId: long (nullable = true) 
|-- visitStartTime: long (nullable = true) 
|-- date: string (nullable = true) 
|-- totals: struct (nullable = true) 
| |-- visits: long (nullable = true) 
| |-- hits: long (nullable = true) 
| |-- pageviews: long (nullable = true) 
| |-- timeOnSite: long (nullable = true) 
| |-- bounces: long (nullable = true) 
| |-- transactions: long (nullable = true) 
| |-- transactionRevenue: long (nullable = true) 
| |-- newVisits: long (nullable = true) 
| |-- screenviews: long (nullable = true) 
| |-- uniqueScreenviews: long (nullable = true) 
| |-- timeOnScreen: long (nullable = true) 
| |-- totalTransactionRevenue: long (nullable = true) 
(snipped) 

年2月以后的totalTransactionRevenuse底部不存在了。所以我认为这是导致错误,并且是related to this issue