2016-08-01 37 views
0

我正在开始使用apache spark。 我有一个要求将json日志转换为扁平指标,也可以认为是一个简单的csv。从apache spark中的JSON日志创建聚合指标

例如,

"orderId":1, 
    "orderData": { 
    "customerId": 123, 
    "orders": [ 
    { 
     "itemCount": 2, 
     "items": [ 
     { 
      "quantity": 1, 
      "price": 315 
     }, 
     { 
      "quantity": 2, 
      "price": 300 
     }, 

     ] 
    } 
    ] 
} 

这可以被视为一个单一的JSON日志,我打算将它转换成,

orderId,customerId,totalValue,units 
    1 , 123 , 915 , 3 

我正在经历sparkSQL文档,并可以用它来获得像单值保持“选择订单中的orderId,orderData.customerId“,但我不知道如何获得所有价格和单位的总和。

什么应该是最好的做法,以完成这个使用Apache的火花?

+0

着,我们不喜欢数据帧DF = sqlContext.read()JSON。 ( “/路径/到/文件”)toDF(); df.registerTempTable(“df”); df.printSchema();之后通过sql执行聚合? –

+0

通过SQL,我可以掌握单个元素,但不知道有关orders.items,我如何在此上运行聚合?我认为它只会作为一个json值,如果我错过了某些东西,请纠正我。 – fireants

+0

你可以看看[this](http://xinhstechblog.blogspot.in/2015/06/reading-json-data-in-spark-dataframes.html)&[nested json](http:// xinhstechblog .blogspot.in/2016/05/reading-json-nested-array-in-spark.html) –

回答

1

尝试:

>>> from pyspark.sql.functions import * 
>>> doc = {"orderData": {"orders": [{"items": [{"quantity": 1, "price": 315}, {"quantity": 2, "price": 300}], "itemCount": 2}], "customerId": 123}, "orderId": 1} 
>>> df = sqlContext.read.json(sc.parallelize([doc])) 
>>> df.select("orderId", "orderData.customerId", explode("orderData.orders").alias("order")) \ 
... .withColumn("item", explode("order.items")) \ 
... .groupBy("orderId", "customerId") \ 
... .agg(sum("item.quantity"), sum(col("item.quantity") * col("item.price"))) 
+0

感谢您的工作逻辑,我会尝试映射它在Java中,并张贴在这里为他人。 – fireants

0

对于谁是寻找上述的Java解决方案的人,请按:

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> orders = sqlContext.read().json("order.json"); 
    Dataset<Row> newOrders = orders.select(
      col("orderId"), 
      col("orderData.customerId"), 
      explode(col("orderData.orders")).alias("order")) 
      .withColumn("item",explode(col("order.items"))) 
      .groupBy(col("orderId"),col("customerId")) 
      .agg(sum(col("item.quantity")),sum(col("item.price"))); 
    newOrders.show();