2016-04-12 97 views
1

我需要你们的帮助和建议为一个独立的火花集群上运行的Apache星火KafkaWordCount例如运行:星火流 - KafkaWordCount无法在星火独立集群

我可以通过

运行星火例如,KafkaWordCount,以本地模式
spark-submit .... --master local[4] .... 

我可以从另一个节点(虚拟机)中的Kafka服务器获取消息并获得打印在终端控制台上的结果。

然而,递交申请时,火花独立集群(通过

spark-submit .... --master spark://master:7077 .... 

),我发现异常的$ SPARK_HOME /工作的每个工人节点的目录/../../标准错误目录。 并且每个字计数批次的结果是不是打印到每个工作节点中的$ SPARK_HOME/work /../ .. stdout。

这里是/ conf目录/ spark-env.sh我在$ SPARK_HOME资源的每一个火花工作节点的设置:

export SPARK_MASTER_IP=master 
export SPARK_WORKER_CORES=4 
export SPARK_WORKER_MEMORY=3g 
export SPARK_WORKER_INSTANCES=2 

我有5个虚拟机节点(在这里主机名):mykafka,硕士,DATA1 ,data2和data3。

感谢您提前给予的任何帮助和建议。

以下是RpcTimeoutException在每个工人发现了异常:

16/04/11 23:07:30 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval 
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
    at scala.util.Try$.apply(Try.scala:161) 
    at scala.util.Failure.recover(Try.scala:185) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
    .... 
    .... 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds 
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
    ... 7 more 
16/04/11 23:07:31 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 
beat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval 
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
    .... 
    .... 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds 
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
    ... 7 more 
+0

你说,你有5个虚拟机是他们能够连接到每个其他? 有防火墙阻止他们访问某些端口吗? 你是否将服务器绑定到公共接口? – Vishnu667

+0

是的,他们可以通过SSH密码设置相互连接。我可以成功地在纱线客户端模式下提交其他火花作业。同样来自主节点,我可以连接到“mykafka”节点。 – Alan

+0

我以纱线客户端模式提交了它,发现其中一个执行程序无法启动: {{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError ='kill%p'-Xms4096m -Xmx4096m -Djava.io .tmpdir = {{PWD}}/tmp'-Dspark.driver.port = 44618'-Dspark.yarn.app.container.log.dir = -XX:MaxPermSize = 256m org.apache.spark.executor。CoarseGrainedExecutorBackend --driver-url spark:// CoarseGrainedScheduler – Alan

回答

3

所以我有完全相同的问题,这个例子,它似乎与此有关的bug https://issues.apache.org/jira/browse/SPARK-13906

不知道如何设置这个例子,但我一直在试验代码,构建一个小型的scala应用程序,并向SparkCo添加了一个额外的配置参数NF()

val conf = new SparkConf() 
.setAppName('name') 
.set("spark.rpc.netty.dispatcher.numThreads","2") 

感谢大卫·戈麦斯和火花邮件,其中googleing的很多后,我找到了解决办法

https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%[email protected].com%3E

+0

没问题我在昨天遇到这个帖子时遇到同样的问题,刚刚找到正确的谷歌短语来找到邮件程序 – Robrotheram

+0

Robrotheram,再次感谢你。你的帮助对我来说非常重要。这也有利于我的其他项目和计划。我试图调整所有不同的spark-submit参数或配置,但是它们都不适用于开发集群,并且我无法从Spark UI的Streaming选项卡中看到它是否会表现良好。这些信息对我来说是有益的,也是非常重要的一步。 Alan – Alan