我是Spark的新手,任何人都可以帮助我吗?Spark群集上的DStrream [String] .foreachrdd
def streamStart() {
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar"))
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test";
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2)
lines.print()
println("*****************************************************************************")
lines.foreachRDD(
iter => iter.foreach(
x => println(x + "\n***-------------------------------------------------------***\n")))
println("-----------------------------------------------------------------------------")
ssc.start()
ssc.awaitTermination()
在一个Spark独立的集群,代码不工作,但对当地[*],它工作正常:
lines.foreachRDD(
iter => iter.foreach(
x => println(x + "\n***-------------------------------------------------------***\n")
)
)
什么意思_it不WORK_?或者_works正确地处理这件事。 – zero323
我怀疑你是以错误的方式提交你的应用程序。你在使用spark-submit吗?从代码提交应用程序到集群是相当不可能的(事实上非常棘手)。看看这里:http://spark.apache.org/docs/latest/submitting-applications.html –
@dwysakowicz是的,我通过spark-submit –