2013-11-21 54 views
3
  • 我们在单个节点上有1个喷嘴和1个螺栓。 Spout从RabbitMQ中读取数据,并将其发送给将数据写入Cassandra的唯一螺栓。
  • 我们的数据源每秒产生10000条消息,风暴需要大约10秒来处理,这对我们来说太慢了。
  • 我们试图增加拓扑的并行性,但这并没有什么区别。

什么是理想的消息,可以在一个单一的节点机器上处理1个螺栓和1个螺栓?以及提高风暴拓扑处理速度的可能方法是什么?风暴处理数据极其缓慢

更新: 这是示例代码,它包含RabbitMQ和cassandra的代码,但是会导致相同的性能问题。

// Topology Class 
public class SimpleTopology { 

public static void main(String[] args) throws InterruptedException { 
    System.out.println("hiiiiiiiiiii"); 
    TopologyBuilder topologyBuilder = new TopologyBuilder(); 
    topologyBuilder.setSpout("SimpleSpout", new SimpleSpout()); 
    topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout"); 

    Config config = new Config(); 
    config.setDebug(true); 
    config.setNumWorkers(2); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology()); 

    Thread.sleep(2000); 
} 

}

// Simple Bolt 
public class SimpleBolt implements IRichBolt{ 

private OutputCollector outputCollector; 

public void prepare(Map map, TopologyContext tc, OutputCollector oc) { 
    this.outputCollector = oc; 
} 

public void execute(Tuple tuple) { 
    this.outputCollector.ack(tuple); 
} 

public void cleanup() { 
    // TODO 
} 

public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    // TODO 
} 

public Map<String, Object> getComponentConfiguration() { 
    return null; 
} 

}

// Simple Spout 

public class SimpleSpout implements IRichSpout{ 

private SpoutOutputCollector spoutOutputCollector; 
private boolean completed = false; 
private static int i = 0; 

public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {  
    this.spoutOutputCollector = soc; 
} 

public void close() { 
    // Todo 
} 

public void activate() { 
    // Todo 
} 

public void deactivate() { 
    // Todo 
} 

public void nextTuple() { 
    if(!completed) 
    { 
     if(i < 100000) 
     { 
      String item = "Tag" + Integer.toString(i++); 
      System.out.println(item); 
      this.spoutOutputCollector.emit(new Values(item), item); 
     } 
     else 
     { 
      completed = true; 
     } 
    } 
    else 
    { 
     try { 
      Thread.sleep(2000); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
} 

public void ack(Object o) { 
    System.out.println("\n\n OK : " + o); 
} 

public void fail(Object o) { 
    System.out.println("\n\n Fail : " + o); 
} 

public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    ofd.declare(new Fields("word")); 
} 

public Map<String, Object> getComponentConfiguration() { 
    return null; 
} 

}

更新: 是否有可能与shuffle的分组相同的元组将被处理一次以上?使用的配置(喷嘴= 4.螺栓= 4),现在的问题是,随着螺栓数量的增加,性能下降。

+0

是什么,你已经尝试了配置,U可以张贴一些代码?你究竟从RabbitMQ读到了什么? – user2720864

+0

'我们的数据源每秒产生10000条消息'..你是说这是因为'nextTuple'方法中的if(我<100000)'语句吗? – user2720864

+0

看看http://blog.relateiq。com/monitoring-storm/ – Chiron

回答

3

你应该知道这里有什么瓶颈 - RabbitMQ或Cassandra。打开Storm UI并查看每个组件的延迟时间。

如果增加并行性没有帮助(通常应该),那么RabbitMQ或Cassandra肯定存在问题,因此您应该关注它们。

+0

yup,RabbitMq正在放缓这些事情 – user3017482

0

我们正在成功使用RabbitMQ和Storm。结果存储在不同的数据库中,但无论如何。我们首先在Spout中使用了basic_get,并且表现糟糕,但随后我们转向了basic_consume,性能实际上非常好。所以看看你如何从兔子消费信息。 一些重要因素:

  • basic_consume代替basic_get
  • prefetch_count(使它足够高)
  • 如果你想提高性能,你不在乎失去的消息 - 不ACK消息和将delivery_mode设置为1.
+0

有问题吗?在单节点上观察到的正常元组合格传递率是多少? – user3017482

+0

这取决于我所说的很多事情会影响它。在我们的场景中,我们有一个CPU密集型的螺栓,它做了很多计算。 – Vor

2

在您的代码中,每次调用nextTuple()时只会发出一个元组。尝试每次调用发射更多的元组。

类似:

public void nextTuple() { 

    int max = 1000; 
    int count = 0; 
    GetResponse response = channel.basicGet(queueName, autoAck); 
    while ((response != null) && (count < max)) { 

     // process message 

     spoutOutputCollector.emit(new Values(item), item); 

     count++; 
     response = channel.basicGet(queueName, autoAck); 
    } 

    try { Thread.sleep(2000); } catch (InterruptedException ex) { 
} 
+0

可以分享一些例子吗?我试过,但无法弄清楚。 – user3017482

+0

@ user3017482用一些伪代码更新了我的答案 –