2016-07-26 15 views
0

拆分DSTREAM如何给给定一个匹配

val dstream = ssc.createStream(..) 

我们如何获得分时段/分组/拆分组Dstreams从它,大意如下:

val (s1, s2, s3): (DStream[_],DStream[_],DStream[_]) = 
    dstream.map{ in match => 
    case <cond1> => bucket1Value 
    case <cond2> => bucket2Value 
    case _ => bucket3Value 
    }.<some bucketing/grouping operation> 

RE:可能重复这是一个完全不同的问题 - 另一个是关于RDD的不是DStream的!

+0

@LostInOverflow DStream与RDD的完全不同 - 你为什么会建议它们是相同的? – javadba

+0

@LostInOverflow如果你同意OP的评论,你可以取消你的标志吗? (如果再次点击“标志”,您将看到一个按钮) –

+0

DStream是RDD的一个序列,每个操作都应用在RDD上,因此拆分DStream与拆分RDD是同样的问题。如果有解决方案来拆分RDD,则有一种解决方案来拆分DStream。它对面的dir。 – 2016-07-27 09:33:42

回答

0

回答我自己的问题:但如果有人(任何人?)有建议直接执行操作,它将很乐意接受。

因此,这里是a解决方案 - 虽然不优雅。

val s1 = dstream.flatMap{ in match => 
    case r if <cond1> => bucket1Value 
    case _   => None 
} 
val s2 = dstream.flatMap{ in match => 
    case r if <cond2> => bucket1Value 
    case _   => None 
} 
val s3 = dstream.flatMap{ in match => 
    case r if !<cond1> && !<cond2> => bucket3Value 
    case _   => None 
} 
相关问题