我正在使用Spark 2.0并尝试使用wholeTextFiles API对文件进行流式处理。我的Spark程序正在成功读取文件夹中的第一批文件,但我无法对后续批文件进行流式处理。使用wholeTextFiles进行Spark流式处理
请让我知道如何在使用WholeTextFiles API时对文件进行流式处理。
这里是我的代码:
SparkConf sparkConf = new SparkConf().setAppName("My app")
.setMaster("local")
.set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext().wholeTextFiles("C:/Users/my/files/abcd7/simple/*.txt");
JavaRDD<String> stringRDD = wholeTextFiles.map(
-----
----
return mySchema;);
SQLContext hc = new HiveContext(jssc.sparkContext());
Dataset<Row> df = hc.createDataFrame(schemaRDD, mySchema.class);
df.createOrReplaceTempView("myView");
df.show();
jssc.start();
jssc.awaitTermination();
Spark是处理数据的第一批。但没有进一步批量..我没有在这里使用javaDStream,这可能会导致此错误。我如何从wholetextfiles api获取javaDStream?
更新错误:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
at com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:225)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
您没有启动流。无论你如何处理你的数据,你都应该启动它,以便火花实际上开始读取和处理数据。 – Mehraban
当我添加jssc.start(); jssc.awaitTermination(); Spark正在处理第一批的数据。但没有任何进一步的批次..更新问题与错误我越来越.. – AKC