2017-01-09 85 views
2

我正在使用Apache NiFi和Apache Spark为大学做一个小型项目。我想创建一个NiFi工作流,它可以从HDFS中读取TSV文件,并使用Spark Streaming我可以处理这些文件并在MySQL中存储我需要的信息。我已经在NiFi中创建了我的工作流程,并且存储部分已经在工作。问题是我无法解析NiFi包,所以我可以使用它们。使用spark解析NiFi数据包

的文件包含的行这样的:

linea1File1 TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU 

其中每个空间是一个标签( “\ t” 的)

这是我在星火代码使用的Scala:

val ssc = new StreamingContext(config, Seconds(10)) 
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) 
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

在此之前,我可以在单个字符串中获取整个文件(7000多行)......不幸的是,我无法将这个字符串拆分成多行。我需要整行获取整个文件,所以我可以在一个对象中解析它,在其上应用一些操作并存储我想要的内容

任何人都可以帮助我?

回答

3

每个数据包都将是来自NiFi的一个流文件的内容,因此如果NiFi从HDFS中拾取具有很多行的一个TSV文件,那么所有这些行都将位于一个数据包中。

很难说没有看到你的NiFi流,但你可能可以使用行数为1的SplitText在你的TSFi中分裂你的TSV,然后才能触发流式传输。

+0

非常感谢你......这完全解决了我的问题......我从来没有想过用NiFi解决它..我专注于Spark ...... –