2016-02-25 57 views
-1

我有一些数据如下,我已经读入通常的火花RDD(无架构)每个键一个新列创建值:如何为用于基于条件火花的Scala

before transformation

现在,我想创建一个新列。如果客户的事件包含w作为值,则新列将被设置为1。所以造成RDD将是:

after treansformation

我一直没能做到这一点。我至今为以下,其中数据1是读入RDD数据:

val data2 = data1.groupBy(_._2) 
    .map(_._2.map{ case (a1: Array[String], a2, a3, a4) => 
    val myString = "w" 
    if (a1.contains(myString)) { (a1,a2,a3,a4,array_of_ones) else (a1,a2,a3,a4,array_of_zeros)} 
     }) 

1http://i.stack.imgur.com/P7bTx.jpgenter代码在这里

在上面,array_of_ones和array_of_zeros必须具有相同的长度为A1的每个分区。我怎样才能做到这一点?如果可能,请假定不允许加入RDD。谢谢。你能解决这个

+1

这实在是很难理解你想要什么。客户3没有'w',但是在indicator3中有一些。此外,可以张贴正确的类型而不是截图的实际数据结构?类型很重要..最后,加入与groupBy具有几乎相同的性能。并不是说这是必需的。 – zero323

+0

对此我感到抱歉。 Indicator3对于客户3必须具有0,对于客户4必须具有1。对于事件和客户,类型是字符串,对于指标是Int。我将数据读入一个元组(a1,a2,a3,a4)。我会在2天内更新问题。我很抱歉。 – eugenerory

+0

不需要道歉,但请纠正这:) – zero323

回答

1

一种方法是使用DataFrames

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.sum 

val df = data1.toDF("event", "customer", "indicator1", "indicator2") 
val w = Window.partitionBy($"customer").rowsBetween(Long.MinValue, Long.MaxValue) 

val isW = ($"event" === "w").cast("long") 
val indicator3 = (sum(isW).over(w) > 0).cast("long") 

df.withColumn("indicator3", indicator3)