2015-02-05 79 views
3

同样的问题也适用于将RDD分成几个新的RDD。Spark将一个DStream分成几个RDD

DStream或RDD包含几个不同的案例类,我需要根据案例类的类型将它们变成单独的RDD。

我所知道的

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

val newRDD = rdd.filter { 
    a => a match { 
    case _: CC1 => true 
    case _ => false 
    } 
} 

但是这需要通过原RDD,每箱类类型一个不少的运行。

  1. 必须有一个更简洁的方法来做上述匹配过滤器?
  2. 有没有办法通过一个平行关卡通过元素类型将rdd分成几个?

回答

1

看起来像rdd.filter我在长形式的正确轨道上。稍微更简洁版本是:

val newRDD = rdd.filter { case _: CC1 => true ; case _ => false } 

你不能离开了case _ => false或测试类并不详尽,你会得到错误。我无法让收集器正常工作。

@maasg得到正确答案的信贷关于做单独的过滤器通过,而不是黑客的方式来分裂输入一次通过。

4

1)对于给定类型的过滤的更简洁的方式是使用rdd.collect(PartialFunction[T,U])

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

等效将是:

val newRDD = rdd.collect{case c:CaseClass1 => c} 

它甚至可以是结合额外的过滤和转换:

val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget} 

rdd.collect(p:PartialFunction[T,U])不应rdd.collect()它提供数据反馈给驾驶者混淆。


2)要分割RDD(或为此事DSTREAM)filter是要走的路。必须记住,RDD是分布式集合。过滤器可让您将功能应用于该分布式集合的一个子集,并行地通过集群。

从原始RDD中产生2个或更多RDD的结构创建将产生1对多的混洗阶段,这将会大大增加成本。

+0

好的,但我真的不想收藏吗?我编辑了上面的代码,将返回的过滤器分配给一个rdd。换句话说,我确实需要一个过滤器,对吧? – pferrel 2015-02-05 19:30:31

+0

我想这是因为从闭包返回值比布尔值更容易? – pferrel 2015-02-05 19:48:33

+0

编辑了答案以表明该呼叫返回RDD。正如我所提到的,rdd.collect()和rdd。收集{case x => x.y}有两个完全不同的目的。 – maasg 2015-02-05 19:50:37

相关问题