2016-08-24 146 views
1

我想用Apache Kafka和Spark流设置一个流应用程序。 Kafka运行在一个单独的unix机器版本0.9.0.1上,spark v1.6.1是hadoop集群的一部分。卡夫卡与火花集成

我已经启动了zookeeper和kafka服务器,并且想要使用控制台生产者从日志文件中传输消息并使用直接方法(无接收器)使用Spark应用程序使用该消息。我已经用Python写的代码,并使用下面的命令执行:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar streamingDirectKafka.py 

得到如下错误:

/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream 
py4j.protocol.Py4JJavaError: An error occurred while calling o38.createDirectStreamWithoutMessageHandler. 
: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 

能否请你帮忙吗?

谢谢!

from pyspark import SparkContext, SparkConf 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

if __name__ == "__main__": 
    conf = SparkConf().setAppName("StreamingDirectKafka") 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1) 

    topic = ['test'] 
    kafkaParams = {"metadata.broker.list": "apsrd7102:9092"} 
    lines = (KafkaUtils.createDirectStream(ssc, topic, kafkaParams) 
         .map(lambda x: x[1])) 
    counts = (lines.flatMap(lambda line: line.split(" ")) 
        .map(lambda word: (word, 1)) 
        .reduceByKey(lambda a, b: a+b)) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

回答

0

看起来你使用的是不兼容的卡夫卡版本。从Spark 2.0开始的文档 - 支持Kafka 0.8.x。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

+0

ü意味着我应该使用卡夫卡版本0.8.4星火版本1.6.1呢? 此外,当我运行字数例如,在Scala中,我得到不同的错误: 异常在线程“主要” java.lang.ClassCastException:kafka.cluster.BrokerEndPoint不能转换到kafka.cluster.Broker 命令: bin/run-example streaming.DirectKafkaWordCount apsrd7102:9092 test Thanks !! –

+0

即使使用kafka版本0.8.2.1,我也遇到同样的错误? 你可以请帮忙,如果可能是我使用不正确的jar的依赖? –