2016-02-27 96 views
2

我试图利用直接卡夫卡消费者(python的新功能),以捕获我在本地主机上运行的自定义卡夫卡生产者的数据:9092。PySpark直接卡夫卡流(Apache Spark 1.6)

我目前使用由spark 1.6示例脚本提供的“direct_kafka_wordcount.py”。

来源https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py

DOCShttp://spark.apache.org/docs/latest/streaming-kafka-integration.html

我使用下面的命令运行程序:

~/spark-1.6.0/bin/spark-submit --jars 
    ~/spark-1.6.0/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.6.0.jar   
direct_kafka_wordcount.py localhost:9092 twitter.live 

不幸的是,我发现了一个奇怪的错误,我无法调试。任何提示/建议将非常感激。

py4j.protocol.Py4JJavaError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler. 
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
     at scala.util.Either.fold(Either.scala:97) 
     at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 
     at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) 
     at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720) 
     at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
     at py4j.Gateway.invoke(Gateway.java:259) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:209) 
     at java.lang.Thread.run(Thread.java:745) 

回答

2

错误:

java.nio.channels.ClosedChannelException 

意味着topic不存在,或经纪人无法到达或有一些网络(代理)此类问题的。

通过在火花上运行kafka-console-consumer节点,确保没有此类连接问题。

+1

我得到了和@cynical饼干一样的问题,我确定这个话题已经存在,主从节点可以访问这个话题。但是,我仍然遇到同样的错误。任何建议? – Decula

+0

@Decula:如果经纪人的主机名未在您的驱动程序和从机中配置,它也会出现。当你联系zookeeper读取偏移范围时,它实际上使用了kafka代理的主机名(我在EMR上遇到过这个问题)。这应该打印在某些地方的日志中。注意那些经纪人的名字,并检查司机/奴隶机器是否可以解决它!如果没有,请在/ etc/hosts文件中添加hostname-IP映射 – Mohitt

+0

这对我有用。我的问题是我使用了错误的端口。 – Hardy

0

在我的火花流其中消费的话题从卡夫卡情况:

得到错误,因为这和程序退出。 所以我检查metadata.broker.list,只增加了一个经纪人。 添加除其中之一以外的所有经纪人。 而且一切顺利,但仍然会发出警告org.apache.spark.SparkException: java.nio.channels.ClosedChannelException,所以我从zk检查kafka经纪人状态,并发现一个经纪人被破坏,导致这样的错误。

0

我有类似的问题。但是变成不同的解决方案。我有不同版本的scala运行spark和kafka。

我结束了在两侧使用相同的版本,然后pyspark能够生成类。

我用以下

火花:火花1.6.3彬hadoop2.6.tgz 火花流 - 卡夫卡:火花流 - 卡夫卡assembly_2.10-1.6.3.jar