1
我想用Apache Kafka和Spark流设置一个流应用程序。 Kafka运行在一个单独的unix机器版本0.9.0.1上,spark v1.6.1是hadoop集群的一部分。卡夫卡与火花集成
我已经启动了zookeeper和kafka服务器,并且想要使用控制台生产者从日志文件中传输消息并使用直接方法(无接收器)使用Spark应用程序使用该消息。我已经用Python写的代码,并使用下面的命令执行:
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar streamingDirectKafka.py
得到如下错误:
/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o38.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
能否请你帮忙吗?
谢谢!
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
conf = SparkConf().setAppName("StreamingDirectKafka")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
topic = ['test']
kafkaParams = {"metadata.broker.list": "apsrd7102:9092"}
lines = (KafkaUtils.createDirectStream(ssc, topic, kafkaParams)
.map(lambda x: x[1]))
counts = (lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b))
counts.pprint()
ssc.start()
ssc.awaitTermination()
ü意味着我应该使用卡夫卡版本0.8.4星火版本1.6.1呢? 此外,当我运行字数例如,在Scala中,我得到不同的错误: 异常在线程“主要” java.lang.ClassCastException:kafka.cluster.BrokerEndPoint不能转换到kafka.cluster.Broker 命令: bin/run-example streaming.DirectKafkaWordCount apsrd7102:9092 test Thanks !! –
即使使用kafka版本0.8.2.1,我也遇到同样的错误? 你可以请帮忙,如果可能是我使用不正确的jar的依赖? –