2014-07-04 50 views
3

我想分解我的数据集到火车和测试数据集。我第一个文件读入到内存如下所示:火花斯卡拉得到不寻常的地图元素

val ratings = sc.textFile(movieLensdataHome+"/ratings.csv").map { line=> 
    val fields = line.split(",") 
    Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble) 
} 

然后我选择那些对我的训练集的80%:

val train = ratings.sample(false,.8,1) 

是否有一个简单的方法来测试在分布式设置这样, 我想这一点,但失败:

val test = ratings.filter(!_.equals(train.map(_))) 
+0

在问题的关键误解是在使用你的过滤器功能“地图”的。 'map'是将一个值转换为给定函数指定的另一个值。你可能会考虑的是在hashmap中查找值,但这是误解了map函数的定义。 – David

回答

3
val test = ratings.subtract(train) 
0

而是采用的排除方法(L IKE滤波器或减去),身份证分区组“手动”为一个更高效的执行:

val probabilisticSegment:(RDD[Double,Rating],Double=>Boolean) => RDD[Rating] = 
    (rdd,prob) => rdd.filter{case (k,v) => prob(k)}.map {case (k,v) => v} 

val ranRating = rating.map(x=> (Random.nextDouble(), x)).cache 
val train = probabilisticSegment(ranRating, _ < 0.8) 
val test = probabilisticSegment(ranRating, _ >= 0.8) 

cache保存中间RDD sothat下两个操作可以从该点上,而不在招致进行执行完整的血统。

(*)请注意,使用val来定义函数而不是defval s是串行器友好的

+0

我在执行probabilisticSegment时遇到问题。在'{case(k,v)=> v'中'}'结束?还在学习scala鸭子打字。 – venuktan

+0

应该是:'{case(k,v)=> v}'。编辑代码以反映这一点。 – maasg

1

看看这里。 http://markmail.org/message/qi6srcyka6lcxe7o

下面是代码

def split[T : ClassManifest](data: RDD[T], p: Double, seed: Long = 
System.currentTimeMillis): (RDD[T], RDD[T]) = { 
    val rand = new java.util.Random(seed) 
    val partitionSeeds = data.partitions.map(partition => rand.nextLong) 
    val temp = data.mapPartitionsWithIndex((index, iter) => { 
     val partitionRand = new java.util.Random(partitionSeeds(index)) 
     iter.map(x => (x, partitionRand.nextDouble)) 

    }) 
    (temp.filter(_._2 <= p).map(_._1), temp.filter(_._2 > p).map(_._1)) 
    }