0
首先,为标题道歉,我不确定如何简洁地描述这一点。Spark多个输出路径导致多个输入读取
我有一个Spark日志解析到JSON,然后使用spark-sql将特定列转换为ORC并写入各种路径。例如:
val logs = sc.textFile("s3://raw/logs")
val jsonRows = logs.mapPartitions(partition => {
partition.map(log => {
logToJson.parse(log)
}
}
jsonRows.foreach(r => {
val contentPath = "s3://content/events/"
val userPath = "s3://users/events/"
val contentDf = sqlSession.read.schema(contentSchema).json(r)
val userDf = sqlSession.read.schema(userSchema).json(r)
val userDfFiltered = userDf.select("*").where(userDf("type").isin("users")
// Save Data
val contentWriter = contentDf.write.mode("append").format("orc")
eventWriter.save(contentPath)
val userWriter = userDf.write.mode("append").format("orc")
userWriter.save(userPath)
当我写这篇文章时,我预计解析会发生一次,然后它会写入相应的位置。但是,它似乎在执行文件中的所有代码两次 - 一次为content
,一次为users
。这是预期的吗?我宁愿我最终不会从S3传输数据和解析两次,因为这是最大的瓶颈。我从Spark UI中附加了一个图像,以显示单个“流”窗口的任务重复。感谢您的任何帮助,您可以提供!