2017-09-15 40 views
1

我正在尝试将Spark应用程序提交给我的Spark主机。主站和多个从站在OpenShift-Environment中运行。 Spark主站的Web UI显示连接的工作人员。Spark:在集群模式下部署到OpenShift时出现StreamCorruptedException

应用程序jar被部署到/jars在每个火花吊舱。

这是我提交脚本:

spark-submit2.cmd --conf "spark.driver.extraClassPath=/jars" 
        --conf "spark.executor.extraClassPath=/jars" 
        --conf "spark.submit.deployMode=cluster" 
        --master spark://******:31824 
        --class Main 'local:/jars/SparkHelloWorld-1.0-SNAPSHOT.jar' 

应用程序本身很简单:

public class Main { 

    private static String MASTER = "spark://******:31824"; 

    public static void main(String[] args) throws Exception { 

     //Create a SparkContext to initialize 
     SparkConf conf = new SparkConf() 
       .setMaster(MASTER) 
       .setAppName("SparkPi"); 

     // Create a Java version of the Spark Context 
     JavaSparkContext sc = new JavaSparkContext(conf); 

     final int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 4; 
     final int n = 100000 * slices; 
     final List<Integer> l = new ArrayList<>(n); 
     for (int i = 0; i < n; i++) { 
      l.add(i); 
     } 

     final JavaRDD<Integer> dataSet = sc.parallelize(l, slices); 

     final int count = dataSet.map(integer -> { 
      double x = Math.random() * 2 - 1; 
      double y = Math.random() * 2 - 1; 
      return (x * x + y * y < 1) ? 1 : 0; 
     }).reduce((a, b) -> a + b); 

     System.out.println("Pi is roughly " + 4.0 * count/n); 
    } 
} 

我每次运行此脚本,我得到下面的异常:

Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: 
     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) 
     at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233) 
     at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
     at org.apache.spark.deploy.Client$.main(Client.scala:233) 
     at org.apache.spark.deploy.Client.main(Client.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.RuntimeException: java.io.StreamCorruptedException: invalid stream header: 01000D31 
     at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:857) 
     at java.io.ObjectInputStream.<init>(ObjectInputStream.java:349) 
     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) 
     at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:107) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:259) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:308) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:258) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:257) 
     at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:577) 
     at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:562) 
     at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159) 
     at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107) 
     at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) 
     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
     at java.lang.Thread.run(Thread.java:748) 

     at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207) 
     at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) 
     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
     at java.lang.Thread.run(Unknown Source) 

我在任何spark文档中都找不到这个问题。我错过了什么吗?

回答

2

它可能与主站和从站中的不同Spark版本有关。

1

可能发生以下的因之一:

  1. 在代码中使用的火花的版本比群集的不同。
  2. Scala版本不匹配用于编码和用于运行群集的spark。

对我而言,我使用的是使用scala 2.10构建的Spark依赖项,而在集群中它使用2.11构建的spark上运行。 希望它可以帮助别人。干杯!

相关问题