2016-12-05 34 views
0

您好我试图插入元件RDD阵列[字符串]使用阶火花。如何插入元件在火花RDD阵列

这里是一个例子。

val data = RDD[Array[String]] = Array(Array(1,2,3), Array(1,2,3,4), Array(1,2)). 

我想使这个数据中所有数组的长度为4。

如果数组的长度小于4,I要填充的阵列中的NULL值。

这里是我的代码,我试图解决的问题。

val newData = data.map(x => 
    if(x.length < 4){ 
     for(i <- x.length until 4){ 
     x.union("NULL") 
     } 
    } 
    else{ 
     x 
    } 
) 

但结果是Array[Any] = Array((), Array(1, 2, 3, 4),())

所以,我想其他办法。我在for循环中使用了yield

val newData = data.map(x => 
    if(x.length < 4){ 
     for(i <- x.length until 4)yield{ 
     x.union("NULL") 
     } 
    } 
    else{ 
     x 
    } 
) 

结果是Array[Object] = Array(Vector(Array(1, 2, 3, N, U, L, L)), Array(1, 2, 3, 4), Vector(Array(1, 2, N, U, L, L), Array(1, 2, N, U, L, L)))

这些都不是我想要的。我想回到这样

RDD[Array[String]] = Array(Array(1,2,3,NULL), Array(1,2,3,4), Array(1,2,NULL,NULL)).

我应该怎么办? 有没有办法解决它?

回答

2

union是一个功能性操作,它不改变所述阵列x。不过,您不需要用循环来完成此操作,而且任何循环实现都可能会更慢 - 使用所有NULL值创建一个新集合要好得多,而不是每次添加空值时都要进行变异。下面是应为你工作lambda函数:

def fillNull(x: Array[Int], desiredLength: Int): Array[String] = { 
    x.map(_.toString) ++ Array.fill(desiredLength - x.length)("NULL") 
} 

val newData = data.map(fillNull(_, 4)) 
+0

我很欣赏您的意见! 非常感谢! 我会试试! –

1

我用下面的代码解决了你的使用情况:

val initialRDD = sparkContext.parallelize(Array(Array[AnyVal](1, 2, 3), Array[AnyVal](1, 2, 3, 4), Array[AnyVal](1, 2, 3))) 
val transformedRDD = initialRDD.map(array => 
    if (array.length < 4) { 
    val transformedArray = Array.fill[AnyVal](4)("NULL") 
    Array.copy(array, 0, transformedArray, 0, array.length) 
    transformedArray 
    } else { 
    array 
    } 
) 
val result = transformedRDD.collect() 
+0

这个硬编码的'4'让我觉得我可能使用了'array.length'的广播帽。 –

+0

非常感谢! 这很有用! –