2017-05-29 76 views
0

我是新来的spark和scala。我想读取包含json文件的目录。该文件具有名为“EVENT_NAME”的属性,可以有20个不同的值。我需要根据属性值分开事件。即EVENT_NAME = event_A事件。将这些写入配置单元外部表结构中,例如:/ apps/hive/warehouse/db/event_A/dt = date/hour = hr火花数据框被写入分区

这里我有20个不同的表,用于所有事件类型和与每个事件相关的数据应该去到各自的桌子。 我已经设法编写了一些代码,但需要帮助才能正确写入我的数据。

{ 
import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.registerTempTable("trafficRepDf") 

trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample") 
} 

最后一行创建分区输出,但不是我确切需要它。请建议我怎样才能得到它正确或任何其他代码来做到这一点。

回答

1

我假设你的意思是你想保存数据放入单独的目录中,而不使用Spark/Hive的{column}={value}格式。

您将无法使用Spark的partitionBy,因为Spark分区迫使您使用该格式。

相反,你要打破你的DataFrame成组成的分区,并保存它们一个接一个,像这样:

{ 
    import org.apache.spark.sql._ 
    import sqlContext._ 

    val path = "/source/data/path" 
    val trafficRep = sc.textFile(path) 

    val trafficRepDf = sqlContext.read.json(trafficRep) 
    val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them. 
    for (eventName <- eventNames) { 
    val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName) 
    trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}") 
    } 
} 
0

我想,你希望像/apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zz表结构,那么你需要通过EVENT_NAMEdthour分区,所以试试这个:

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 
+0

数据没有日期和时间信息在里面。我需要在外部提供它。 – Anup

1

您可以添加使用日期和时间列到您的数据帧。

import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue")) 

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")