2015-03-02 54 views
0

我正在尝试将MongoDB与Apache Spark集成来处理数据。这里是我的(虚拟)代码:org.apache.spark.SparkException:与MongoDB集成时由于阶段失败而导致作业中止

import java.util.Arrays; 
import java.util.Collections; 
import java.util.Map; 
import java.util.HashMap; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.bson.BSONObject; 
import org.bson.BasicBSONObject; 
import java.util.Comparator; 

import scala.Tuple2; 

import com.mongodb.hadoop.MongoOutputFormat; 
import com.mongodb.hadoop.MongoInputFormat; 
import com.mongodb.hadoop.BSONFileOutputFormat; 

public final class JavaWordCount { 

    public static void main(String[] args) { 

    String input = args[0]; 
    String output = args[1]; 

     JavaSparkContext sc = new JavaSparkContext(); 

     Configuration config = new Configuration(); 
     config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/" + input); 
     config.set("mongo.job.input.format",       "com.mongodb.hadoop.MongoInputFormat"); 

//I have tryed with the same configuration for both too 
    Configuration outputConfig = new Configuration(); 
    outputConfig.set("mongo.output.format", 
       "com.mongodb.hadoop.MongoOutputFormat"); 
    outputConfig.set("mongo.output.uri", 
       "mongodb://localhost:27017/" + output); 


     JavaPairRDD<Object, BSONObject> mongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class); 

     // Input contains tuples of (ObjectId, BSONObject) 
     JavaRDD<String> words = mongoRDD.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() { 
      @Override 
      public Iterable<String> call(Tuple2<Object, BSONObject> arg) { 
       Object o = arg._2.get("user"); 
       if (o instanceof String) { 
        String str = (String) o; 
        return Arrays.asList(str); 
       } else { 
        return Collections.emptyList(); 
       } 
      } 
     }); 

     JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { 
      public Tuple2<String, Integer> call(String s) { 
       return new Tuple2<>(s, 1); 
      } 
     }); 

     // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null 
     JavaPairRDD<Object, BSONObject> save = ones.mapToPair(new PairFunction<Tuple2<String, Integer>, Object, BSONObject>() { 
      @Override 
      public Tuple2<Object, BSONObject> call(Tuple2<String, Integer> tuple) { 
     BSONObject bson = new BasicBSONObject(); 
       bson.put("word", tuple._1); 
       //bson.put("count", tuple._2); 
       return new Tuple2<>(null, bson); 
      } 
     }); 

     // Only MongoOutputFormat and config are relevant 

     save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, BSONObject.class, MongoOutputFormat.class, outputConfig); 

    } 
} 

它与SBT完全编译和工作方式使用

../spark-1.2.1-bin-hadoop2.4/bin/spark-subt --master local --jars $(echo /home/luis/mongodb_old/mongo-spark/mongo-spark-master-3/lib/*.jar | tr ' ' ',') --class "JavaWordCount" target/scala-2.10/mongo-spark_2.10-1.0.jar mydb.testCollection mydb.output 

好但如果我尝试

../spark-1.2.1-bin-hadoop2.4/bin/spark-subt --master spark://luis:7077 --jars $(echo /home/luis/mongodb_old/mongo-spark/mongo-spark-master-3/lib/*.jar | tr ' ' ',') --class "JavaWordCount" target/scala-2.10/mongo-spark_2.10-1.0.jar mydb.testCollection mydb.output 

(即,执行它独立群集而不是本地)我收到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 23, ip184.com.uvigo.es): java.lang.IllegalStateException: open 
    at org.bson.util.Assertions.isTrue(Assertions.java:36) 
    at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:406) 
    at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:184) 
    at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:167) 
    at com.mongodb.DBCollection.insert(DBCollection.java:161) 
    at com.mongodb.DBCollection.insert(DBCollection.java:107) 
    at com.mongodb.DBCollection.save(DBCollection.java:1049) 
    at com.mongodb.DBCollection.save(DBCollection.java:1014) 
    at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:105) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:993) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:969) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
15/03/02 13:31:26 INFO TaskSetManager: Lost task 8.1 in stage 0.0 (TID 22) on executor ip184.com.uvigo.es: java.lang.IllegalStateException (open) [duplicate 2] 

我试过解决方案张贴在Spark-Submit exception SparkException: Job aborted due to stage failure但它没有解决它。我也读过很多其他帖子,但我找不到解决方案。

任何帮助,将不胜感激。

P.S .:我试着在发布之前遵循所有规则,但这是我在stackoverflow中的第一篇文章,所以如果我犯了任何错误,我道歉并承诺不再做这件事。

在此先感谢。

编辑: 我已经升级到最新版本的Spark和MongoDB。我不断收到相同的异常,但它似乎在内部捕获,所以过程不会停止。但是,导致异常的数据不会被处理,所以每次执行后我都会得到不同的结果。这是我现在得到:

15/03/23 17:05:34 WARN TaskSetManager: Lost task 0.1 in stage 0.0 (TID 4, 10.0.2.15): java.lang.IllegalStateException: open 
at org.bson.util.Assertions.isTrue(Assertions.java:36) 
at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:406) 
at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:184) 
at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:167) 
at com.mongodb.DBCollection.insert(DBCollection.java:161) 
at com.mongodb.DBCollection.insert(DBCollection.java:107) 
at com.mongodb.DBCollection.save(DBCollection.java:1049) 
at com.mongodb.DBCollection.save(DBCollection.java:1014) 
at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:105) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

编辑2: MongoDB的集合。其中我试图读取有大约3万份文件。我刚刚尝试了一个只有10,000个,它像一个魅力,但它似乎不是一个很好的解决方案。它会是什么? 在此先感谢。

回答

0

你输出的MongoDB服务器似乎被配置为:

"mongodb://localhost:27017/" 

你确定吗?它不应该是你的所有工作人员可以接触到的适当的远程地址吗?

问候,

+0

@Oliver吉拉尔多特:我曾尝试 “的mongodb://我-IP:27017 /”,并在/etc/mongod.conf comented了bind_ip = 127.0.0.1。我仍然得到同样的错误。谢谢。 – user3307590 2015-03-09 11:21:54

相关问题