2015-04-15 78 views
3

将DStream RDD与批量RDD进行联合/联合/联合使用的唯一方法是通过“transform”方法,该方法返回另一个DStream RDD,因此将在其末尾丢弃微批次。向DStream中的批量RDD添加新元素RDD

有没有办法将Dstream RDD与批RDD结合起来,从而生成包含DStream RDD和批RDD的元素的新批RDD。

并且一旦以上述方式创建了这样的批RDD,它是否可以被其他DStream RDD用于例如因为此时结果可能是另一个DStream RDD

有效地,上述功能将导致对批量RDD的元素进行周期性更新(附加) - 其他元素将继续来自DStream RDD,这些元素将保持流式传输每个微批次。 还新到DSTREAM RDDS将能够参加与这样以前更新一批RDD和

东西几乎产生结果DSTREAM RDD这样可以updateStateByKey来实现,但有没有办法做到这一点这里描述

回答

4

另一种方法是将批量输入转换为DStream并将其与流式输入结合。然后使用foreachRDD将其写出来,这是您批量输入其他作业的新增内容。

val batch = sc.textFile(...) 

val ssc = new StreamingContext(sc, Seconds(30)) 
val stream = ssc.textFileStream(...) 

import scala.collection.mutable 
val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch) 

val union = ssc.union(Seq(stream, batchStream)) 

union.print() 

union.foreachRDD { rdd => 
    // Delete previous, or use SchemaRDD with .insertInto(, overwrite = true) 
    rdd.saveTextFile(...) 
} 

ssc.start() 
ssc.awaitTermination() 
+0

快速的问题:我可以传递一个参照本发明的'mutable.Queue'到'queueStream()',然后任意地推RDDS到该队列,并让它们被合并到流? 如果不是那么我想我可以通过调用'queueStream()'新的批处理RDD,然后'scc.union()'每次我想合并一个新的RDD来实现这个功能? –