2016-02-02 32 views
2

我只想将流的内容打印到控制台。我写了下面的代码,但它不打印任何东西。任何人都可以帮助我在Spark中读取文本文件作为流?是否有与Windows系统有关的问题?在Windows系统中打印流(Spark流)的内容

public static void main(String[] args) throws Exception { 

    SparkConf sparkConf = new SparkConf().setAppName("My app") 
     .setMaster("local[2]") 
     .setSparkHome("C:\\Spark\\spark-1.5.1-bin-hadoop2.6") 
     .set("spark.executor.memory", "2g"); 

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

    JavaDStream<String> dataStream = jssc.textFileStream("C://testStream//copy.csv"); 
    dataStream.print(); 

    jssc.start(); 
    jssc.awaitTermination(); 
} 

UPDATE:copy.csv的内容是

0,0,12,5,0 
0,0,12,5,0 
0,1,2,0,42 
0,0,0,0,264 
0,0,12,5,0 
+0

'copy.csv'的内容是什么? – Sumit

+0

它是用逗号分隔的文件。我用内容更新了我的问题 – Eyad

回答

3

textFileStream是监控Hadoop的支持目录。此操作将监视提供的目录,并且当您在提供的目录中添加新文件时,它将读取/流式传输新添加的文件中的数据。

您无法使用textFileStream来阅读文本/ csv文件,或者我会说,如果您只是阅读文件,则不需要流式传输。

我的建议是监视某个目录(可能是HDFS或本地文件系统),然后使用textFileStream添加文件并捕获这些新文件的内容。

可能在你的代码可能是你可以用C://testStream",一旦你的星火流的工作是建立和运行,然后添加文件copy.csvC://testStream文件夹,看到星火控制台输出替换"C://testStream//copy.csv"

OR

可能是你可以编写读取该文件的另一个命令行斯卡拉/ Java程序和扔在插座上的内容(在某一端口#)和未来可以用于捕获和读取利用socketTextStream数据。一旦您读取了数据,您将进一步应用其他转换或输出操作。

你也可以认为利用Flume

参考API Documentation更多细节

+0

实际上,我不仅在阅读这些文件,而且为了让我的问题保持清晰,我写了一小段代码。实际上,我想将流的内容发送给朴素贝叶斯模型以进行预测(机器学习),但不幸的是,我不能仅将文件作为流读取。你能否向我解释一下,我该如何处理Spark中的csv文件?我不明白如何在Windows中监视HDFS或本地文件系统?我尝试用C:// testStream替换“C://testStream//copy.csv”,但是当我将这些文件添加到目录时,这不起作用。!!! – Eyad

+0

我已经详细阐述了我对阅读文本的回答文件,就'textFileStream'而言,确保你是使用唯一/唯一名称创建新文件,并且内容也应该是新的。不要重命名或复制具有相同内容的文件,这是行不通的。 – Sumit

1

这为我工作在Windows 7和Spark 1.6.3:(删除的代码的其余部分,重要的是如何定义监视的文件夹)

val ssc = ... 
val lines = ssc.textFileStream("file:///D:/tmp/data") 
... 
print 

...

此监控目录d:/ tmp目录/数据,SSC是我的海峡eaming方面

步骤:

  1. 创建一个文件1.txt的说在d:/ tmp目录/数据
  2. 输入一些文字
  3. 启动SPART应用
  4. 文件重命名为数据。 TXT(我相信任何任意的名字会做,只要它改变,而目录受火花监控)

我注意到的另一件事是临时我不得不将行分隔符更改为Unix样式(使用Notepad ++),否则文件不会被拾取。