2017-04-05 98 views
0

我正在使用Spark 2.0并尝试使用wholeTextFiles API对文件进行流式处理。我的Spark程序正在成功读取文件夹中的第一批文件,但我无法对后续批文件进行流式处理。使用wholeTextFiles进行Spark流式处理

请让我知道如何在使用WholeTextFiles API时对文件进行流式处理。

这里是我的代码:

SparkConf sparkConf = new SparkConf().setAppName("My app") 
          .setMaster("local") 
          .set("spark.driver.allowMultipleContexts", "true"); 

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15)); 

JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext().wholeTextFiles("C:/Users/my/files/abcd7/simple/*.txt"); 

JavaRDD<String> stringRDD = wholeTextFiles.map(
    ----- 
    ---- 
    return mySchema;); 

SQLContext hc = new HiveContext(jssc.sparkContext()); 

Dataset<Row> df = hc.createDataFrame(schemaRDD, mySchema.class); 

df.createOrReplaceTempView("myView"); 

df.show();  
jssc.start(); 
jssc.awaitTermination(); 

Spark是处理数据的第一批。但没有进一步批量..我没有在这里使用javaDStream,这可能会导致此错误。我如何从wholetextfiles api获取javaDStream?

更新错误:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
    at com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:225) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
+0

您没有启动流。无论你如何处理你的数据,你都应该启动它,以便火花实际上开始读取和处理数据。 – Mehraban

+0

当我添加jssc.start(); jssc.awaitTermination(); Spark正在处理第一批的数据。但没有任何进一步的批次..更新问题与错误我越来越.. – AKC

回答

0

wholeTextFile将返回一个RDD。除非您对其执行某些操作,否则火花不会启动任何操作。

错误“没有输出操作注册,所以没有执行”告诉你,你根本没有使用流上下文。

检查spark文档中有关如何编写流作业的示例。

+0

即使做了df.show(),作业不流。它只处理第一批,而不是流式作业。您能否就此提供任何指导.. – AKC

0

根据spark docs,在处理数据流时,应避免将主设置为locallocal[1],因为它不会留下任何内核来处理数据。

When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

相关问题