2016-03-03 90 views
1

如何将json转换为parquet与Spark进行流式传输? acutually我必须从服务器ssh,接收一个大json文件,将其转换为实木复合地板,并将其上传到hadoop。 我有办法以流水线的方式做到这一点? 他们的备份文件,所以我必须与大小不随时间改变的文件的预定量目录Spark:将json流式传输到parquet

喜欢的东西:

scp host /dev/stdout | spark-submit myprogram.py | hadoop /dir/ 

编辑: 其实我正在此:

sc = SparkContext(appName="Test") 
sqlContext = SQLContext(sc) 
sqlContext.setConf("spark.sql.parquet.compression.codec.", "gzip") 
#Since i couldn't get the stdio, went for a pipe: 
with open("mypipe", "r") as o: 
     while True: 
       line = o.readline() 
       print "Processing: " + line 
       lineRDD = sc.parallelize([line]) 
       df = sqlContext.jsonRDD(lineRDD) 
       #Create and append 
       df.write.parquet("file:///home/user/spark/test", mode="append") 
print "Done." 

这是工作正常,但所得到的镶木地板是非常大的(280KB 4行2列的JSON)。任何改进?

+0

它只是*一个*文件吗?或者它是你需要从ssh(或者共享文件夹..)获取的文件流? – Markon

+0

有多个文件,但我需要单独处理它们。 –

+0

是的,但它是一个流或者你确切地知道文件的数量?这些文件是否不断被写入(作为流)到这个/这些文件夹?也许新的文件每隔X秒被添加一次......(?) – Markon

回答