2011-06-23 50 views
7

我是HornetQ的新手,请耐心等待。我首先要告诉你我的要求:HornetQ消息在使用核心API后仍然在队列中

我需要一个消息队列中间件,它可以在不同的进程之间传递大小约为1k的消息,具有低延迟和持久性(即应该能够在系统崩溃时幸存)。我将有多个进程写入相同的队列,同样的多个进程从同一个队列读取。

为此,我选择了HornetQ,因为它具有持久性消息传递的最佳评分。

我目前usung HornetQ的v2.2.2Final独立服务器
我能够成功地创建使用核心API(ClientSession中)持久/非持久队列,并成功发布消息排队(ClientProducer)
同样,我可以使用核心API (ClientConsumer)从队列中读取消息。

问题出现在客户端读取完消息后,消息仍然保留在队列中,即队列中的消息数保持不变。也许我得到了这个错误,但我的印象是,一旦这条消息被消耗,就会从队列中移除。但是这并不是发生在我的情况中,同样的消息正在一遍又一遍地阅读。

此外,我想告诉我,我已经尝试使用非持久消息的非持久队列。但问题依然存在

规范生产,我现在用:

public class HQProducer implements Runnable { 

    private ClientProducer producer; 
    private boolean killme; 
    private ClientSession session; 
    private boolean durableMsg; 

    public HQProducer(String host, int port, String address, String queueName, 
      boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { 
     this.durableMsg = durableMsg; 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      if (queueExists(queueName)) { 
       if (deleteQ) { 
        System.out.println("Deleting existing queue :: " + queueName); 
        session.deleteQueue(queueName); 
        System.out.println("Creating queue :: " + queueName); 
        session.createQueue(address, queueName, true); 
       } 
      } else { 
       System.out.println("Creating new queue :: " + queueName); 
       session.createQueue(address, queueName, durable); 
      } 
      producer = session.createProducer(SimpleString.toSimpleString(address), pRate); 

      killme = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killme) { 
      try { 
       ClientMessage message = session.createMessage(durableMsg); 

       message.getBodyBuffer().writeString("Hello world"); 

       producer.send(message); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("Producer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killme) { 
     this.killme = killme; 
    } 

    private boolean queueExists(String qname) { 
     boolean res = false; 
     try { 
      //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); 
      QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); 
      if (queueQuery.isExists()) { 
       res = true; 
      } 
     } catch (HornetQException ex) { 
      res = false; 
     } 
     return res; 
    } 
} 

也为消费者的代码是:

public class HQConsumer implements Runnable { 

    private ClientSession session; 
    private ClientConsumer consumer; 
    private boolean killMe; 

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) { 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      session.start(); 

      consumer = session.createConsumer(queueName, "",0,-1,browseOnly); 

      killMe = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killMe) { 
      try { 
       ClientMessage msgReceived = consumer.receive(); 
       msgReceived.acknowledge(); 
       //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("ConSumer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killMe) { 
     this.killMe = killMe; 
    } 
} 

HornetQ的服务器配置::

<configuration xmlns="urn:hornetq" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> 

    <paging-directory>${data.dir:../data}/paging</paging-directory> 

    <bindings-directory>${data.dir:../data}/bindings</bindings-directory> 

    <journal-directory>${data.dir:../data}/journal</journal-directory> 

    <journal-min-files>10</journal-min-files> 

    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> 

    <connectors> 
     <connector name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </connector> 

     <connector name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     </connector> 
    </connectors> 

    <acceptors> 
     <acceptor name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </acceptor> 

     <acceptor name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     <param key="direct-deliver" value="false"/> 
     </acceptor> 
    </acceptors> 

    <security-settings> 
     <security-setting match="#"> 
     <permission type="createNonDurableQueue" roles="guest"/> 
     <permission type="deleteNonDurableQueue" roles="guest"/> 
     <permission type="createDurableQueue" roles="guest"/> 
     <permission type="deleteDurableQueue" roles="guest"/> 
     <permission type="consume" roles="guest"/> 
     <permission type="send" roles="guest"/> 
     </security-setting> 
    </security-settings> 

    <address-settings> 
     <!--default for catch all--> 
     <address-setting match="#"> 
     <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
     <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
     <redelivery-delay>0</redelivery-delay> 
     <max-size-bytes>10485760</max-size-bytes>  
     <message-counter-history-day-limit>10</message-counter-history-day-limit> 
     <address-full-policy>BLOCK</address-full-policy> 
     </address-setting> 
    </address-settings> 

</configuration> 
+0

的A/C [本](HTTP:// docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354)你需要在处理完毕后确认消息,你是否也这样做? –

回答

13

随着HornetQ核心你必须明确地回复一条消息。我没有看到你的测试中发生了什么。

如果您不是acking,这就是您的邮件被阻止的原因。我需要看到你的完整例子给你一个完整的答案。

另外:你应该定义你的了createSession:了createSession(真实的,真实的,0)

核心API有一个选项可以批量的ACK。您没有使用事务处理会话,因此,只有在达到在serverLocator中配置的ackBatchSize之后,才会将消息发送到服务器。有了这个设置,只要您在消息中调用acknowledge(),任何确认都会发送到服务器。

您正在使用的选项等同于具有某个DUPS_SIZE的JMS DUPS_OK。

(编辑后我的初步回答了一些反复和你)之后

+1

'ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge();''我承认代码 –

+0

核心API有批量确认的选项。您没有使用事务处理会话,因此,只有在达到在serverLocator中配置的ackBatchSize之后,才会将消息发送到服务器。 你应该定义你的createSession: createSession(true,true,0); 完成此操作后,只要您在消息 –

+1

处调用acknowledge(),就会将任何确认发送到服务器。您没有回到此主题。所以我假设你解决了你的问题? –

2

设置ackbatchsize帮我解决这个问题.. 感谢您的帮助

+2

你应该在这里回答一个答案。 –

相关问题