2016-11-26 42 views
1

我有以下代码。Spark 1.6 scala创建数据行

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
val baseDF = sqlContext.read.json(fileFullPath) 

我的json有2个感兴趣的领域:ProductId和Quantity。我在找

{ 
    "sales": { 
     "saledate": "17Mar2008", 
     "sale": [{ 
      "productid": 1, 
      "quantity": 10 
     }, { 
      "productid": 2, 
      "quantity": 1 
     }, { 
      "productid": 3, 
      "quantity": 3 
     }, { 
      "productid": 4, 
      "quantity": 5 
     }] 
    } 
} 

我想改变这其中有2列,基于数量的productid和数量,但多行的火花RDD或DF。我想每个数量1。

在上面的例子中,产品1有10行,产品2有1,产品3有3,产品4有5行,共计19行,即#rows = sum(quantity)。

任何帮助表示赞赏。我正在使用spark 1.6.2和scala。

+0

请改变你的问题,目前它是完全不可读的 –

+0

对不起...第一次发布在堆栈上..谢谢:@gasparms – SSC

+0

没问题 - 我写了,因为别人可以downvote问题,因为格式不佳;) –

回答

0

这应该做的事:

import org.apache.spark.sql.functions._ 

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import sqlContext.implicits._ 

val baseDF = sqlContext.read.json(fileFullPath) 
val listFromQuantity = udf { quantity: Int => List.fill(quantity)(quantity) } 

baseDF.select(explode($"sales.sale")).select($"col.productId", explode(listFromQuantity($"col.quantity"))).show() 

将返回:

+---------+--------+ 
|productId|quantity| 
+---------+--------+ 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  2|  1| 
|  3|  3| 
|  3|  3| 
|  3|  3| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
+---------+--------+ 

如果你想有第二列单数量(如具有价值1代替5)你应该用List.fill(quantity)(1)替换List.fill(quantity)(quantity)

+0

工作就像一个魅力....谢谢soooo多。 .. – SSC