2016-11-18 45 views
0

我已经使用ActiveMQ作为代理和JMS来接收异步。队列中的消息会在消息进入队列后立即开始使用这些消息。 为此我有书面的生产者和消费者代码。 一切工作正常,但整个过程花费过多时间大约2-3分钟10000条记录(我只用于模拟回路) 下面是整个代码:JMS生产者和消费者花费太多时间。为什么?

这是JMS监制:

public class JmsMessageProducer 
{ 
    public void jmsListener(String obj) throws Exception 
    { 
     BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:61616)")); 
     broker.start(); 
     Connection connection = null; 
     Session session = null; 
     MessageProducer producer = null; 
     try 
     { 
      long millis = System.currentTimeMillis() % 1000; 
      // Producer 
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 
      connection = connectionFactory.createConnection(); 
      session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 

      Queue queue = session.createQueue("customerQueue3"); 

      MessageConsumer consumer = session.createConsumer(queue); 
      consumer.setMessageListener(new ConsumerMessageListener("Consumer3")); 
      connection.start(); 

      producer = session.createProducer(queue); 

      for(int i=0; i<10000; i++) 
      { 
       String payload = "Important Task"+i; 
       Message msg = session.createTextMessage(payload); 



       System.out.println("Sending text '" + payload + "'"); 

       producer.send(msg); 
      } 

      long millis2 = System.currentTimeMillis() % 1000; 
      long millis3 = millis2- millis; 
      System.out.println("time taken: "+ millis3); 

     } 
     finally 
     { 
      if(producer != null) 
      { 
       producer.close(); 
      } 
      if (session != null) 
      { 
       session.close(); 
      } 
      if (connection != null) 
      { 
       connection.close(); 
      } 
      broker.stop(); 
     } 
    } 
} 

这是监听器代码:

public class ConsumerMessageListener implements MessageListener 
{ 
    private String consumerName; 

    public ConsumerMessageListener(String consumerName) 
    { 
     this.consumerName = consumerName; 
    } 

    DummyAdapter adapter = new DummyAdapter(); 
    public void onMessage(Message message) 
    { 
     TextMessage textMessage = (TextMessage) message; 
     try 
     {   
      System.out.println(consumerName + " received "+ textMessage.getText()); 
     // adapter.dummy(textMessage.getText()); 
     } 
     catch (JMSException e) 
     { 
      e.printStackTrace(); 
     } 
    } 
} 

我这样做的第一次。 谁能告诉我为什么这个过程花费了太多时间? 我在做什么错?

回答

1

我不确定您期望的SLA,但是您的消息Broker正在以每秒80个消息的速度工作,这并不坏。

而您的代码有一个问题是session.createProducer(queue)是时间滞后的问题,因为它是一个代价高昂的操作(需要时间),您可以使用单个生产者对象生成多条消息。

所以对于循环创建MessageProducer外,如下图所示:

MessageProducer producer = session.createProducer(queue); 
for(int i=0; i<10000; i++) { 
    String payload = "Important Task"+i; 
    Message msg = session.createTextMessage(payload); 

    System.out.println("Sending text '" + payload + "'"); 

    producer.send(msg); 
} 

你必须closefinally块都producersession对象。

P.S:此外,作为一个侧面说明,它会如果你的名字你生产者classJmsMessagePrducer而不是JmsMessageListener作为一般我们使用的名称为Listener仅供JMS消费者是一件好事。

UPDATE:

我只是想知道是好处理约80 消息/秒?消费后也没有任何操作。如果 在队列消息消耗之后添加更多任务,例如插入 db或某些业务操作。

如果不知道哪个服务器/操作系统等等(需要考虑许多参数),简单地说80个消息/秒更好或50个消息/秒更好是不明智的。当涉及性能要求时,您需要首先指定/定义要求。

如果您当前的代码处理大约80条消息/秒,那么您的应用程序(测试程序)基准测试就是针对给定条件。因此,如果您觉得它不符合您的性能要求,那么您需要配置多个JMS侦听器来并行处理消息并更改您的设计。

+0

感谢您回复javaguy。我按照你的建议更新了我的代码(正如你在我的问题中看到的那样),但它仍然需要很长时间。你能提出其他建议吗? – RishiPandey

+0

您期待的SLA是什么? – developer

+0

我只想知道处理大约80条消息/秒是否好?消费后也没有任何操作。如果在队列中消费消息之后添加更多任务,比如插入数据库或某些业务操作,该怎么办? – RishiPandey