1
如何将json转换为parquet与Spark进行流式传输? acutually我必须从服务器ssh,接收一个大json文件,将其转换为实木复合地板,并将其上传到hadoop。 我有办法以流水线的方式做到这一点? 他们的备份文件,所以我必须与大小不随时间改变的文件的预定量目录Spark:将json流式传输到parquet
喜欢的东西:
scp host /dev/stdout | spark-submit myprogram.py | hadoop /dir/
编辑: 其实我正在此:
sc = SparkContext(appName="Test")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec.", "gzip")
#Since i couldn't get the stdio, went for a pipe:
with open("mypipe", "r") as o:
while True:
line = o.readline()
print "Processing: " + line
lineRDD = sc.parallelize([line])
df = sqlContext.jsonRDD(lineRDD)
#Create and append
df.write.parquet("file:///home/user/spark/test", mode="append")
print "Done."
这是工作正常,但所得到的镶木地板是非常大的(280KB 4行2列的JSON)。任何改进?
它只是*一个*文件吗?或者它是你需要从ssh(或者共享文件夹..)获取的文件流? – Markon
有多个文件,但我需要单独处理它们。 –
是的,但它是一个流或者你确切地知道文件的数量?这些文件是否不断被写入(作为流)到这个/这些文件夹?也许新的文件每隔X秒被添加一次......(?) – Markon