2017-09-25 47 views
1

我正在尝试编写一个简单的vanilla协作过滤应用程序,运行在Google Cloud Dataproc上。 该数据位于BigQuery中。 我已经实现了这个根据本教程:https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlGoogle云Dataproc上的IllegalStateException

现在的问题是,当运行这个(稍加修改)的例子,我得到一个IllegalStateException。更具体地说,这里是堆栈跟踪:

17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Traceback (most recent call last): 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module> 
compute_recommendations() 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations 
conf=conf) 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1298) 
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203) 
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582) 
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040} 
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE]. 

我想我已经确定了问题,但找不到问题的原因。相关的代码片段是这样的:

table_rdd = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=conf) 

table_json = table_rdd.map(lambda x: x[1]) 
visit_data = sparkSession.read.json(table_json) 

首先我根据Google的教程创建RDD。下一步是从RDD中提取JSON元素,然后将其读入表中,以便查询。 stacktrace显示分配conf时会发生异常,但是代码有效,直到我调用sparkSession.read.json(table_json),因为据我了解,spark会延迟工作,然后才会尝试访问从BigQuery中导出的实际JSON文件。

现在的问题是,Spark发现比应该有更多的JSON文件。 根据BigQuery Hadoop库代码中的comment,即使所有内容都符合一个分片,最小值为2,这样BigQuery就可以识别导出。另外它说那里它生成一个所谓的结束标记文件,据我所知,它只是一个空的JSON文件。

但是,运行代码时,由BigQuery生成的导出超过了2个必需文件(1个包含数据,1个作为结束标记)。它最多可生成5个JSON文件,有时仅包含BigQuery中的1或2行。

我很确定这是问题,出口不知何故是错误的。但我无法找出发生这种情况的原因以及如何解决这个问题。任何帮助表示赞赏。

更新:

我试过别的东西。我在BigQuery中删除了表格,并从头开始填充它。这解决了出口的问题。现在只有两个文件。但我认为问题仍然存在。我会尝试通过云功能(这会发生在我的应用程序中)添加一些行,然后更新行为。

更新2:

所以等待了一天,并通过使用云功能的流媒体刀片加入一些行之后,这个问题再次发生。不知何故,出口是按天划分的。如果每一天都有自己的分片,这不会成为问题,但这不会发生。

+0

你有没有可以分享的BigQuery jobid?您也可以直接通过[email protected]联系Google团队以分享您的项目ID。您的评估是正确的,在“结束标记”零长度文件之后不应该有任何文件编号。您是使用BigQuery“流媒体插入”添加行,还是使用重量级“加载”作业添加? –

+0

@DennisHuo感谢您的回复。我已经联系过谷歌团队,但他们花了很多时间来处理这个问题。要插入,我们使用流式插入,通过云端函数。不知何故,只有等待一天并添加额外的行才会显示附加文件。 –

+0

我不知道如果针对BQ运行查询来构建ML系统是一种好方法。在[这个项目](https://github.com/WillianFuks/PySpark-RecSys)中,我创建了你可以看到我有一些出口商在BQ中运行查询并将结果导出到GCS,然后我在spark中读取它们。从来没有任何问题,它的工作速度非常快(并且它避免了总是花费钱的查询,这种方法只运行一次)。巧合的是,它也实现了推荐系统,但使用DIMSUM算法。 –

回答

2

这是BigQuery中的一个错误(它返回不包含零记录文件的输出文件计数统计信息)。此问题的解决方案已提交,其推出将在一周内完成。

与此同时,在配置DataProc作业时,该问题的解决方法可能是在hadoop配置中将标志"mapred.bq.input.sharded.export.enable"(也称作ENABLE_SHARDED_EXPORT_KEY)设置为false。

UPDATE:
截至今天2017年10月6日的,该修补程序现已100%铺开上的BigQuery。

+0

这是否有票据或错误报告?我试图使用与PySpark上DataProc的BigQuery的连接器从表中读取数据时,遇到此问题。 – zo7

+0

我们没有一个公共的bug报告尚未提交,但如果你仍然看到这个错误可能是从一个角落的情况下,一旦表格达到某个状态会产生同样的错误。我们有一个内部错误跟踪这一点,但如果您认为这很有用,可随时向BigQuery提交错误报告。与此同时,您可以使用答案中建议的方法解除封锁。谢谢! –

相关问题