- 我们在单个节点上有1个喷嘴和1个螺栓。 Spout从RabbitMQ中读取数据,并将其发送给将数据写入Cassandra的唯一螺栓。
- 我们的数据源每秒产生10000条消息,风暴需要大约10秒来处理,这对我们来说太慢了。
- 我们试图增加拓扑的并行性,但这并没有什么区别。
什么是理想的消息,可以在一个单一的节点机器上处理1个螺栓和1个螺栓?以及提高风暴拓扑处理速度的可能方法是什么?风暴处理数据极其缓慢
更新: 这是示例代码,它包含RabbitMQ和cassandra的代码,但是会导致相同的性能问题。
// Topology Class
public class SimpleTopology {
public static void main(String[] args) throws InterruptedException {
System.out.println("hiiiiiiiiiii");
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(2);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());
Thread.sleep(2000);
}
}
// Simple Bolt
public class SimpleBolt implements IRichBolt{
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.outputCollector = oc;
}
public void execute(Tuple tuple) {
this.outputCollector.ack(tuple);
}
public void cleanup() {
// TODO
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
// TODO
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
// Simple Spout
public class SimpleSpout implements IRichSpout{
private SpoutOutputCollector spoutOutputCollector;
private boolean completed = false;
private static int i = 0;
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.spoutOutputCollector = soc;
}
public void close() {
// Todo
}
public void activate() {
// Todo
}
public void deactivate() {
// Todo
}
public void nextTuple() {
if(!completed)
{
if(i < 100000)
{
String item = "Tag" + Integer.toString(i++);
System.out.println(item);
this.spoutOutputCollector.emit(new Values(item), item);
}
else
{
completed = true;
}
}
else
{
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public void ack(Object o) {
System.out.println("\n\n OK : " + o);
}
public void fail(Object o) {
System.out.println("\n\n Fail : " + o);
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
更新: 是否有可能与shuffle的分组相同的元组将被处理一次以上?使用的配置(喷嘴= 4.螺栓= 4),现在的问题是,随着螺栓数量的增加,性能下降。
是什么,你已经尝试了配置,U可以张贴一些代码?你究竟从RabbitMQ读到了什么? – user2720864
'我们的数据源每秒产生10000条消息'..你是说这是因为'nextTuple'方法中的if(我<100000)'语句吗? – user2720864
看看http://blog.relateiq。com/monitoring-storm/ – Chiron