2016-11-05 59 views
7

我试图连接在虚拟机中使用IP 10.20.30.50和端口7077从Java应用程序中运行的星火集群和运行字数例子抛出:Spark和Java的:异常的awaitResult

SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount"); 
JavaSparkContext sc = new JavaSparkContext(conf); 
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md"); 
String result = Long.toString(textFile.count()); 
JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); 
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); 
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); 
counts.saveAsTextFile("hdfs://localhost:8020/tmp/output"); 
sc.stop(); 
return result; 

的Java应用程序显示以下堆栈跟踪:

Running Spark version 2.0.1 
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
Changing view acls to: lii5ka 
Changing modify acls to: lii5ka 
Changing view acls groups to: 
Changing modify acls groups to: 
SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set() 
Successfully started service 'sparkDriver' on port 61267. 
Registering MapOutputTracker 
Registering BlockManagerMaster 
Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63 
MemoryStore started with capacity 2004.6 MB 
Registering OutputCommitCoordinator 
Logging initialized @48403ms 
jetty-9.2.z-SNAPSHOT 
Started [email protected]{/jobs,null,AVAILABLE} 
Started [email protected]{/jobs/json,null,AVAILABLE} 
Started [email protected]{/jobs/job,null,AVAILABLE} 
Started [email protected]{/jobs/job/json,null,AVAILABLE} 
Started [email protected]{/stages,null,AVAILABLE} 
Started [email protected]{/stages/json,null,AVAILABLE} 
Started [email protected]{/stages/stage,null,AVAILABLE} 
Started [email protected]{/stages/stage/json,null,AVAILABLE} 
Started [email protected]{/stages/pool,null,AVAILABLE} 
Started [email protected]{/stages/pool/json,null,AVAILABLE} 
Started [email protected]{/storage,null,AVAILABLE} 
Started [email protected]{/storage/json,null,AVAILABLE} 
Started [email protected]{/storage/rdd,null,AVAILABLE} 
Started [email protected]{/storage/rdd/json,null,AVAILABLE} 
Started [email protected]{/environment,null,AVAILABLE} 
Started [email protected]{/environment/json,null,AVAILABLE} 
Started [email protected]{/executors,null,AVAILABLE} 
Started [email protected]{/executors/json,null,AVAILABLE} 
Started [email protected]{/executors/threadDump,null,AVAILABLE} 
Started [email protected]{/executors/threadDump/json,null,AVAILABLE} 
Started [email protected]{/static,null,AVAILABLE} 
Started [email protected]{/,null,AVAILABLE} 
Started [email protected]{/api,null,AVAILABLE} 
Started [email protected]{/stages/stage/kill,null,AVAILABLE} 
Started [email protected]{HTTP/1.1}{0.0.0.0:4040} 
Started @48698ms 
Successfully started service 'SparkUI' on port 4040. 
Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040 
Connecting to master spark://10.20.30.50:7077... 
Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps) 
Connecting to master spark://10.20.30.50:7077... 
Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed 
Failed to connect to master 10.20.30.50:7077 

org.apache.spark.SparkException: Exception thrown in awaitResult 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] 
Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     ... 1 common frames omitted 

在Spark主日志上10.20.30.50,我收到以下错误信息:

当我使用 new SparkConf().setMaster("local")代替
  • 我可以spark-shell --master spark://10.20.30.50:7077连接到星火法师非常同一台机器上
    16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298 
    akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298 
    Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed. 
        at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167) 
        at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580) 
        at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375) 
        at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343) 
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
        at akka.actor.FSM$class.processEvent(FSM.scala:604) 
        at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269) 
        at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) 
        at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) 
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
        at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269) 
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
        at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
        at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
    Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). 
        at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) 
        at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) 
        at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643) 
        at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607) 
        at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703) 
        at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698) 
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) 
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) 
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) 
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) 
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
        at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821) 
        at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168) 
        ... 19 more 
    

    其他信息

    • 示例工作正常
  • +0

    您无法使用此ip'10.20.30.50:7077' – pamu

    +0

    连接到本地计算机上的节点为什么不能? Spark正在我的主机上的虚拟机上运行,​​可通过此IP访问 - 所以我不明白为什么我不能连接到它?这在Spark中有什么特别的限制吗? –

    +0

    你从来没有告诉我,有一个虚拟机之间 – pamu

    回答

    7

    貌似网络首先是错误(但实际上不是)以火花的版本不匹配为掩饰。您可以指向正确版本的火花瓶大多是装配罐。

    由于在使用Protobuffer的Hadoop RPC调用中的版本未命中匹配,可能会发生此问题。

    when a protocol message being parsed is invalid in some way, e.g. it contains a malformed varint or a negative byte length.

    • 我与protobuf的经验,InvalidProtocolBufferException可能发生,只有当消息无法解析(编程,如果您解析protobuf的消息,可能是消息legth为零或消息已损坏。 ..)。

    • 星火采用阿卡演员硕士/驱动器和工人Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

      override def decodePdu(raw: ByteString): AkkaPdu = { 
          try { 
           val pdu = AkkaProtocolMessage.parseFrom(raw.toArray) 
           if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) 
           else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) 
           else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) 
          } catch { 
           case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e) 
          } 
          } 
      

    但在你的情况之间的消息传递,因为它的版本不匹配,新的protobuf的版本信息不能从旧版本解析解析器...或类似的东西...

    如果您使用maven其他依赖关系,请。评论。

    +1

    对我来说,这是scala版本(补丁)。谢谢! – combinatorist

    2

    事实证明,我在虚拟机中运行了Spark 1.5.2版本,并且使用了Java中Spark库的2.0.1版本。我解决了该问题通过使用适当的Spark库版本在我pom.xml这是

    <dependency> 
        <groupId>org.apache.spark</groupId> 
        <artifactId>spark-core_2.10</artifactId> 
        <version>1.5.2</version> 
    </dependency> 
    

    另一个问题(后来发生的)是,我也只好针斯卡拉版本与该图书馆是构建。这是artifactId中的_2.10后缀。

    基本上@ RamPrassad的答案指出我正确的方向,但没有给出明确的建议,我需要做什么来解决我的问题。顺便说一句:我无法更新虚拟机中的Spark,因为它是由HortonWorks发行版带给我的......

    +0

    “如果您正在使用maven其他依赖项,请检查。”...我甚至没有意识到您正在使用maven,并在我的回答中提出了上述建议。 –

    相关问题