2011-03-01 27 views
3

我使用ActiveMQ作为JMS代理和客户,jmsTemplate发送消息,1个非持久主题。在高峰时间,我有〜100信息/秒。ActiveMQ上的重复邮件

队列中有多少条消息并不重要,但我经常会得到重复的消息。我想到的临时解决方案是在表上设置索引 - 目前所有的消息都只保存在数据库中。

我的第一个问题 - 为什么消息是重复的,如果我指定非持久主题和响应不是必需的?

发件人:

@Component 
public class QueueSender 
{ 
    private Logger log = Logger.getLogger(getClass()); 
@Autowired 
    protected JmsTemplate jmsTemplate; 


    public JmsTemplate getJmsTemplate() { 
     return jmsTemplate; 
    } 

    public void setJmsTemplate(JmsTemplate jmsTemplate) { 
     this.jmsTemplate = jmsTemplate; 
    } 

    @Autowired 
    public QueueSender(final JmsTemplate jmsTemplate) 
    { 
     this.jmsTemplate = jmsTemplate; 
     this.jmsTemplate.setDeliveryPersistent(false); 
     System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+ 
       " getDeliveryMode "+jmsTemplate.getDeliveryMode()+ 
       " getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+ 
       " getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode()); 
    } 


    public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp) 
    { 
     jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     jmsTemplate.setMessageIdEnabled(true); 
     Map <String, Object>map = new HashMap<String, Object>(); 
     map.put("tickerId", tickerId); 
     map.put("field", field); 
     map.put("price", price); 
     map.put("timestamp", timestamp); 
     jmsTemplate.convertAndSend("Quotez", map); 
    } 

    public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp) 
    { 
     jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

     Map <String, Object>map = new HashMap<String, Object>(); 
     map.put("tickerId", tickerId); 
     map.put("field", field); 
     map.put("size", size); 
     map.put("timestamp", timestamp); 
     jmsTemplate.convertAndSend("Quotez", map); 

    } 

} 

监听器:

public void onMessage(Message message) 
{ 
    if (message instanceof MapMessage) 
    {   
     try 
     { 
      MapMessage mapMessage = (MapMessage) message; 
       if(null != mapMessage.getString("price")) 
       { 
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"), 
mapMessage.getInt("field"),mapMessage.getLong("timestamp")); 
       }      else{ 
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"), 
mapMessage.getInt("field"),mapMessage.getLong("timestamp")); 
      } 
     } 
     catch (final JMSException e) 
     { 
      exceptionListener.onException(e); 
     } 
    } 
} 

春:

<amq:broker useJmx="true" persistent="false"> 
<amq:transportConnectors> 
    <amq:transportConnector uri="tcp://localhost:0"/> 
</amq:transportConnectors> </amq:broker> 
<amq:topic id="topicDest" physicalName="Quotez"/> 
    <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/> 
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
<constructor-arg ref="jmsFactory" /> 
<property name="exceptionListener" ref="jmsExceptionListener" /> 
<property name="sessionCacheSize" value="100" /> 
</bean> 


<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <constructor-arg ref="connectionFactory"/> 
    <property name="pubSubDomain" value="true"/> 
<property name="defaultDestinationName" value="Quotez"/>  
</bean> 
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="connectionFactory" ref="connectionFactory"/> 
     <property name="destination" ref="topicDest"/> 
     <property name="messageListener" ref="jdbcListener" /> 
    </bean> 

第二个问题是关于jmsContainer配置。上面的代码和下面的代码有什么区别?上面的代码给了我主题作为订户和下面的代码给我队列。

<jms:listener-container concurrency="10" connection-factory="connectionFactory">  
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" /> 
</jms:listener-container> 

我发现,骆驼和其idempotentConsumer想解决重复的问题 - 当然,这将是很好知道为什么它排在首位发生。第三个问题涉及骆驼的配置。我有这个配置(默认):

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
<property name="brokerURL" value="tcp://localhost:0"/> 
</bean> 

<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
<route> 
    <from uri="direct:start"/> 
    <idempotentConsumer messageIdRepositoryRef="myRepo"> 
     <header>messageId</header> 
     <to uri="mock:result"/> 
    </idempotentConsumer> 
</route> 
</camelContext> 

它适用于所有队列还是应该明确订阅?我想它会检查每个主题/队列和所有传入的消息。目前的问题是,所有消息都有messageId = null,并且过滤器将其作为参数。

2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} } 

我没有找到设置messageId的简单方法。我的问题 - 是否足以设置messageId,它将作为例外或配置出现问题,例如我必须指定将使用哪个主题。

感谢,

Dzidas

+0

我建议去问问ActiveMQ用户邮件列表,并检查现有的线程。你可能会在那里找到一些帮助和答案。 http://activemq.apache.org/mailing-lists.html – 2011-03-10 14:41:12

回答

4

使用JMS主题时,你需要设置并发/最大并发用户为“1”或者你会得到重复。如果您需要多线程消耗和/或负载平衡,请改为使用virtual topics