2017-06-15 53 views
2

在我们的风暴1.0.2应用程序中,我们正面临内存异常。调试后,我们看到卡夫卡喷嘴向螺栓发出太多消息。螺栓的运行速度接近4.0。那么是否有一种方法可以在风暴中启用背压,以便根据螺栓中的容量排出喷口。尝试启用topology.backpressure.enable为true,但跑到这个问题https://issues.apache.org/jira/browse/STORM-1949。我们正在使用KafkaSpout开箱即用的实现,并扩展BaseRichBolt以实现我们的螺栓。我们的DAG是线性的。风暴中的背压

回答

2

您可以在拓扑配置设置maxSpoutPending值处理KafkaSpout的背压,

Config config = new Config(); 
config.setMaxSpoutPending(200); 
config.setMessageTimeoutSecs(100); 

StormSubmitter.submitTopology("testtopology", config, builder.createTopology()); 

maxSpoutPending是可以在给定的时间内没有任何拓扑确认的记录数。设置这个属性将会使KafkaSpout不会从Kafka中消耗更多的数据,除非未确认的元组数小于maxSpoutPending值。