2017-03-03 78 views
0

我有一个文件夹包含了很多gzip文件。每个gzip文件都包含xml文件。我曾使用flume将文件流式传输到HDFS。下面是我的配置文件:Flume流gz文件

agent1.sources = src 
agent1.channels = ch 
agent1.sinks = sink 

agent1.sources.src.type = spooldir 
agent1.sources.src.spoolDir = /home/tester/datafiles 
agent1.sources.src.channels = ch 
agent1.sources.src.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder 

agent1.channels.ch.type = memory 
agent1.channels.ch.capacity = 1000 
agent1.channels.ch.transactionCapacity = 1000 

agent1.sinks.sink.type = hdfs 
agent1.sinks.sink.channel = ch 
agent1.sinks.sink.hdfs.path = /user/tester/datafiles 
agent1.sinks.sink.hdfs.fileType = CompressedStream 
agent1.sinks.sink.hdfs.codeC = gzip 
agent1.sinks.sink.hdfs.fileSuffix = .gz 
agent1.sinks.sink.hdfs.rollInterval = 0 
agent1.sinks.sink.hdfs.rollSize = 122000000 
agent1.sinks.sink.hdfs.rollCount = 0 
agent1.sinks.sink.hdfs.idleTimeout = 1 
agent1.sinks.sink.hdfs.batchSize = 1000 

后,我流的文件到HDFS,我使用星火使用下面的代码来阅读:

df = sparkSession.read.format('com.databricks.spark.xml').options(rowTag='Panel', compression='gzip').load('/user/tester/datafiles') 

,但我有问题,读它。如果我手动将一个gzip文件上传到HDFS文件夹并重新运行上面的Spark代码,它就能够无任何问题地读取它。我不确定它是由于水槽。

我试图通过flume下载流文件并将其解压缩,当我查看内容时,它不再显示xml格式,它是一些不可读的字符。任何人都可以点亮这个灯光吗?谢谢。

+0

如何解压缩文件?使用'gunzip'?你用什么插件读取文件时遇到什么问题?您是否尝试过手动specyfing模式spark-xml? – Mariusz

+0

Mariusz,我没有解压缩文件。我正在试图在gz文件中进行流式处理并使用spark来读取它。我没有手动指定模式。当我读取流入的gzip文件并显示它的内容时,它会显示一些特殊字符。但是当我尝试手动上传HDFS中的gzip文件时,我可以毫无问题地阅读它,它可以显示内容和模式,而不会有任何问题。我认为这是由于Flume? – kcyea

回答

0

我认为你在做错!!!为什么?

看到你有一个来源是“不可拆分”邮政编码。你可以通过记录将它们部分读作记录,如果你没有解压缩,你会得到一个GZIPInputStream,你正在使用这个GZIPInputStream。

当您读取GZIP输入流作为输入记录后,您将已经ziped的流保存到另一个GZIP流中,因为您选择压缩的接收器类型。

所以你已经在HDFS中的Gzip中进行了Zipped Streamed。 :)

我建议在cron中安排一个脚本来执行从本地到HDFS的复制将解决您的问题。

+0

噢,我明白了......所以Flume没有办法让gzip文件取代gzip文件并将其以原始格式存入HDFS中?我在考虑如果我要在cron中安排一个脚本来执行从本地到HDFS的复制,那么它将在HDFS中拥有许多小文件。这将最终在HDFS中占用很多块。这是因为每个gzip文件大小都只有20kb左右。 – kcyea

+0

你可以做的是,你可以使用spark来读取这些zip文件,并在处理后将它写入hadoop。 – RBanerjee