2015-02-24 15 views
0

我寻找一种方式来检索作为创建DSTREAM的第一要素:获得第一个元素(取功能)的DSTREAM

val dstream = ssc.textFileStream(args(1)).map(x => x.split(",").map(_.toDouble)) 

不幸的是,没有采取功能(如在RDD )在dstream // dstream.take(2)!!!

有人可以有任何想法如何做到这一点?谢谢

+1

你不能有'take'的结果会不会是'DStream',因为它会产生没有进一步RDDS。但是你知道'DStream'的时间间隔,所以你可以计算出产生下一个2(说)的时间间隔。所以你可以使用'slice' - 这也是'DStream'中唯一产生RDD序列的方法,所以它几乎是你唯一的选择? – 2015-02-24 15:49:39

+0

你关心元素的顺序吗?有一种方法,如果你不* – maasg 2015-02-25 15:04:12

+0

我不清楚你是否想要另一个'dstream'与原始'dstream'的'n'元素,或者每次迭代只需要'n'元素并且对它们进行一些操作。你能澄清吗? – maasg 2015-02-25 15:07:04

回答

1

您可以在DStream对象中使用transform方法,然后获取输入RDD的n个元素并将其保存到列表中,然后过滤要包含在此列表中的原始RDD。这将返回一个包含n个元素的新DStream。

val n = 10 
val partOfResult = dstream.transform(rdd => { 
    val list = rdd.take(n) 
    rdd.filter(list.contains) 
}) 
partOfResult.print 
0

先前建议的解决方案并没有编译我作为取()方法返回一个阵列,是不可序列因而火花流将失败,java.io.NotSerializableException

在前面的代码为我工作的简单变化:

val n = 10 
val partOfResult = dstream.transform(rdd => { 
    rdd.filter(rdd.take(n).toList.contains) 
}) 
partOfResult.print 
相关问题