-1
我是Spark和Scala的新手,现在我不知何故卡住了一个问题:如何通过字段名称处理每行的不同字段,然后放入新的rdd
。Spark Rdd按字段名称处理每一行的不同字段
这是我的伪代码:
val newRdd = df.rdd.map(x=>{
def Random1 => random(1,10000) //pseudo
def Random2 => random(10000,20000) //pseduo
x.schema.map(y=> {
if (y.name == "XXX1")
x.getAs[y.dataType](y.name)) = Random1
else if (y.name == "XXX2")
x.getAs[y.dataType](y.name)) = Random2
else
x.getAs[y.dataType](y.name)) //pseduo,keeper the same
})
})
有上述2级少的错误:
- 第二张地图, “x.getAs” 是一个错误的语法
- 如何进入一个新的
rdd
我在网上搜索很长一段时间。但没用。请帮助或尝试提供一些想法如何实现这一点。
谢谢Ramesh Maharjan,它现在有效。
def randomString(len: Int): String = {
val rand = new scala.util.Random(System.nanoTime)
val sb = new StringBuilder(len)
val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
for (i <- 0 until len) {
sb.append(ab(rand.nextInt(ab.length)))
}
sb.toString
}
def testUdf = udf((value: String) =>randomString(2))
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
df.withColumn("_2", testUdf(df("_2")))
+---+---+
| _1| _2|
+---+---+
| 1| F3|
| 2| Ag|
+---+---+
感谢您的快速回复做,但我希望得到一个新的RDD其中c更改特殊的RDD档案和其他领域保持不变。 – meng
是的,我也认为“udf”。但就我而言,每一个特殊的领域都是不同的。 withColumn会为特殊的字段提供相同的内容。 – meng
'withColumn'会给出使用'udf函数'返回的结果。所以你可以为'udf'中的不同字段“匹配个案”,以获得不同的值。 –