这种问题会导致我管理AmazonDynamoDbClient节流和重试的另一个问题。但是,我认为解决方案可能在我接到发动机电话之前就已经存在。如何抑制Spark Streaming?
我的高级过程如下:我有一个scala应用程序,它利用Apache Spark读取大型CSV文件并对它们执行一些聚合,然后将它们写入发电机。我将这部署到EMR以提供可扩展性。问题是,一旦聚合完成,我们有数百万条记录准备好进入发电机,但我们有发电机的写入能力。它们不需要立即插入,但是可以很好地控制每秒多少次,所以我们可以根据我们的用例对其进行微调。
这里是什么,我至今一个代码示例:
val foreach = new ForeachWriter[Row] {
override def process(value: Row): Unit = {
//write to dynamo here
}
override def close(errorOrNull: Throwable): Unit = {
}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}
val query = dataGrouped
.writeStream
.queryName("DynamoOutput")
.format("console")
.foreach(foreach)
.outputMode(OutputMode.Complete())
.start()
.awaitTermination()
没有人有任何建议如何解决这个问题呢?
所以,我看着这个,但我认为这只是传入数据。此外,它不会开始输出,直到流聚合完成,所以我认为这只会减慢完成的时间。 –
这是,但我想象每个工人正在获取数据,然后将它们写入Dynamo。如果写入需要一段时间,背压设置将有助于确保工作人员不会不知所措并成为瓶颈。除此之外,我不知道有什么方法来“扼杀”Spark Streaming。 – Vidya
酷,不用担心,我会给它一个镜头并报告回来。谢谢 –