2017-05-17 78 views
-1

我是Spark和Scala的新手,现在我不知何故卡住了一个问题:如何通过字段名称处理每行的不同字段,然后放入新的rddSpark Rdd按字段名称处理每一行的不同字段

这是我的伪代码:

val newRdd = df.rdd.map(x=>{ 
     def Random1 => random(1,10000) //pseudo 
     def Random2 => random(10000,20000) //pseduo 
     x.schema.map(y=> { 
      if (y.name == "XXX1") 
      x.getAs[y.dataType](y.name)) = Random1 
      else if (y.name == "XXX2") 
      x.getAs[y.dataType](y.name)) = Random2 
      else 
      x.getAs[y.dataType](y.name)) //pseduo,keeper the same 
     }) 
     }) 

有上述2级少的错误:

  1. 第二张地图, “x.getAs” 是一个错误的语法
  2. 如何进入一个新的rdd

我在网上搜索很长一段时间。但没用。请帮助或尝试提供一些想法如何实现这一点。


谢谢Ramesh Maharjan,它现在有效。

def randomString(len: Int): String = { 
    val rand = new scala.util.Random(System.nanoTime) 
    val sb = new StringBuilder(len) 
    val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 
    for (i <- 0 until len) { 
     sb.append(ab(rand.nextInt(ab.length))) 
    } 
    sb.toString 
    } 
def testUdf = udf((value: String) =>randomString(2)) 
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone"))) 
df.withColumn("_2", testUdf(df("_2"))) 
+---+---+ 
| _1| _2| 
+---+---+ 
| 1| F3| 
| 2| Ag| 
+---+---+ 

回答

1

如果您打算过滤某些felds“XXX1”“XXX2”那么简单select功能应该做的伎俩

,转换至rdd
如果您打算别的东西,然后你的x.getAs应该看起来像下面

val random1 = x.getAs(y.name) 

看来,哟你企图更改值某些列“XXX1”和“XXX2”
对于一个简单的udf功能和withColumn应该做的伎俩
简单udf功能如下

def testUdf = udf((value: String) => { 
    //do your logics here and what you return from here would be reflected in the value you passed from the column 
    }) 

而且你可以调用UDF功能

df.withColumn("XXX1", testUdf(df("XXX1"))) 

同样可以为“XXX2”

+0

感谢您的快速回复做,但我希望得到一个新的RDD其中c更改特殊的RDD档案和其他领域保持不变。 – meng

+0

是的,我也认为“udf”。但就我而言,每一个特殊的领域都是不同的。 withColumn会为特殊的字段提供相同的内容。 – meng

+0

'withColumn'会给出使用'udf函数'返回的结果。所以你可以为'udf'中的不同字段“匹配个案”,以获得不同的值。 –