2017-08-05 129 views

回答

0

我还使用带有Kafka 0.10.0群集的Spark流。在您的代码中添加以下行后,您可以轻松完成。

spark.jars.packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 

这里Python中的示例:

# Initialize SparkContext 
sc = SparkContext(appName="sampleKafka") 

# Initialize spark stream context 
batchInterval = 10 
ssc = StreamingContext(sc, batchInterval) 

# Set kafka topic 
topic = {"myTopic": 1} 

# Set application groupId 
groupId = "myTopic" 

# Set zookeeper parameter 
zkQuorum = "zookeeperhostname:2181" 

# Create Kafka stream 
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) 

#Do as you wish with your stream 
# Start stream 
ssc.start() 
ssc.awaitTermination()