2015-10-06 58 views
0

我是Spark的新手,任何人都可以帮助我吗?Spark群集上的DStrream [String] .foreachrdd

def streamStart() { 
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar")) 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

val topics = "test"; 
ssc.checkpoint("checkpoint") 
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2) 
lines.print() 
println("*****************************************************************************") 
lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n"))) 
println("-----------------------------------------------------------------------------") 
ssc.start() 
ssc.awaitTermination() 

在一个Spark独立的集群,代码不工作,但对当地[*],它工作正常:

lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n") 
    ) 
    ) 
+2

什么意思_it不WORK_?或者_works正确地处理这件事。 – zero323

+0

我怀疑你是以错误的方式提交你的应用程序。你在使用spark-submit吗?从代码提交应用程序到集群是相当不可能的(事实上非常棘手)。看看这里:http://spark.apache.org/docs/latest/submitting-applications.html –

+0

@dwysakowicz是的,我通过spark-submit –

回答

0

我认为什么它被称为“正常工作”是你看到控制台上的println

当您向集群提交相同的代码时,控制台的println在每个执行程序本地发生,所以如果其他所有操作都在工作,则缺少输出仅仅是分布式执行的结果。

查找范围执行人的输出为那些println小号

+0

提交作业我在这里写了'println'来缩短代码。我的实际任务是在进行一些计算后将传入的数据保存到hbase中。在本地它的作品(意味着成功保存到hbase),但在火花独立群集模式下它不起作用:( –

+0

你提到hbase ..你确定你使用正确的配置吗?你是在代码中指定连接参数还是通过* .xml?在工作人员可能不会选择正确的配置。 –