我寻找一种方式来检索作为创建DSTREAM的第一要素:获得第一个元素(取功能)的DSTREAM
val dstream = ssc.textFileStream(args(1)).map(x => x.split(",").map(_.toDouble))
不幸的是,没有采取功能(如在RDD )在dstream // dstream.take(2)!!!
有人可以有任何想法如何做到这一点?谢谢
我寻找一种方式来检索作为创建DSTREAM的第一要素:获得第一个元素(取功能)的DSTREAM
val dstream = ssc.textFileStream(args(1)).map(x => x.split(",").map(_.toDouble))
不幸的是,没有采取功能(如在RDD )在dstream // dstream.take(2)!!!
有人可以有任何想法如何做到这一点?谢谢
您可以在DStream对象中使用transform方法,然后获取输入RDD的n个元素并将其保存到列表中,然后过滤要包含在此列表中的原始RDD。这将返回一个包含n个元素的新DStream。
val n = 10
val partOfResult = dstream.transform(rdd => {
val list = rdd.take(n)
rdd.filter(list.contains)
})
partOfResult.print
先前建议的解决方案并没有编译我作为取()方法返回一个阵列,是不可序列因而火花流将失败,java.io.NotSerializableException。
在前面的代码为我工作的简单变化:
val n = 10
val partOfResult = dstream.transform(rdd => {
rdd.filter(rdd.take(n).toList.contains)
})
partOfResult.print
你不能有'take'的结果会不会是'DStream',因为它会产生没有进一步RDDS。但是你知道'DStream'的时间间隔,所以你可以计算出产生下一个2(说)的时间间隔。所以你可以使用'slice' - 这也是'DStream'中唯一产生RDD序列的方法,所以它几乎是你唯一的选择? – 2015-02-24 15:49:39
你关心元素的顺序吗?有一种方法,如果你不* – maasg 2015-02-25 15:04:12
我不清楚你是否想要另一个'dstream'与原始'dstream'的'n'元素,或者每次迭代只需要'n'元素并且对它们进行一些操作。你能澄清吗? – maasg 2015-02-25 15:07:04