2017-03-28 74 views
0

这种问题会导致我管理AmazonDynamoDbClient节流和重试的另一个问题。但是,我认为解决方案可能在我接到发动机电话之前就已经存在。如何抑制Spark Streaming?

我的高级过程如下:我有一个scala应用程序,它利用Apache Spark读取大型CSV文件并对它们执行一些聚合,然后将它们写入发电机。我将这部署到EMR以提供可扩展性。问题是,一旦聚合完成,我们有数百万条记录准备好进入发电机,但我们有发电机的写入能力。它们不需要立即插入,但是可以很好地控制每秒多少次,所以我们可以根据我们的用例对其进行微调。

这里是什么,我至今一个代码示例:

val foreach = new ForeachWriter[Row] { 
    override def process(value: Row): Unit = { 
     //write to dynamo here 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 
    } 

val query = dataGrouped 
    .writeStream 
    .queryName("DynamoOutput") 
    .format("console") 
    .foreach(foreach) 
    .outputMode(OutputMode.Complete()) 
    .start() 
    .awaitTermination() 

没有人有任何建议如何解决这个问题呢?

回答

0

您应该查看spark.streaming.backpressure.enabled配置。从documentation

设置最大接收速率 - 如果群集资源不足以使流式应用程序处理数据的速度与接收速度一样快,则可以通过设置最大速率限制来限制接收器速率在记录/秒。查看接收器的配置参数spark.streaming.receiver.maxRate和直接Kafka方法的spark.streaming.kafka.maxRatePerition。在Spark 1.5中,我们引入了一项名为backpressure的功能,无需设置此速率限制,因为Spark Streaming会自动计算出速率限制并在处理条件发生变化时动态调整速率限制。可以通过将配置参数spark.streaming.backpressure.enabled设置为true来启用该背压。

+0

所以,我看着这个,但我认为这只是传入数据。此外,它不会开始输出,直到流聚合完成,所以我认为这只会减慢完成的时间。 –

+0

这是,但我想象每个工人正在获取数据,然后将它们写入Dynamo。如果写入需要一段时间,背压设置将有助于确保工作人员不会不知所措并成为瓶颈。除此之外,我不知道有什么方法来“扼杀”Spark Streaming。 – Vidya

+0

酷,不用担心,我会给它一个镜头并报告回来。谢谢 –