2016-07-29 49 views
2

我想了解聚合器的基础知识。下面是使用情况下,我想实现:从队列无法让聚合器工作

1)读消息(订单详细信息)。

<?xml version="1.0" encoding="UTF-8"?> 
<order xmlns="http://www.example.org/orders"> 
    <orderItem> 
    <isbn>12333454443</isbn> 
    <quantity>4</quantity> 
    </orderItem> 
    <orderItem> 
    <isbn>545656777</isbn> 
    <quantity>50</quantity> 
    </orderItem> 
.. 
.. 
</order> 

一个顺序消息将包含多个OrderItem的。我们可以预期在队列中有数百个订单消息。

2)结束结果::

a)每个OrderItem的应写入到文件中。

b)中这样的文件应该被写入唯一的文件夹。

举个例子,假设我们有两个为了消息 - 每个包含三个OrderItem的

因此,我们需要创建2个文件夹:

在 “文件夹1”,应该有4个文件(1个OrderItem的在每个文件)

在 “文件夹2”,应该有2文件(每个文件中有1 orderitem)。这里为简单起见,我们假设不再有订单消息来了,我们可以在5分钟后写入。

实现:


  1. 我能够读取从队列(的WebSphere MQ)的消息,并成功解组消息。
  2. 用于分流分裂基于OrderItem的计数数量的消息。
  3. 二手聚集到组中的4

我无法大小的消息得到聚合器按我的理解而努力。

  1. 我推一个顺序当4 OrderItem的,该消息被正确地得到聚合。
  2. 我推一个订单与5 orderitem,第4个是聚合,但最后一个被发送到丢弃渠道。这是因为MessageGroup被释放,所以最后的消息被丢弃。
  3. 我推了两个订单,每个订单包含2 orderitem。最后的2 orderitem被发送到放弃渠道。
    关联策略是硬编码的(OrderAggregator。Java),但上述情况应该已经奏效。

需要关于如何实现这种使用情况的指针,我可以将它们组合到4中并写入唯一文件夹。 请注意,orderitem都是独立的书籍订单,它们之间没有任何关系。

下面是配置。

弹簧bean.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"> 
    <int:channel id="mqInbound"/> 
    <int:channel id="item"/> 
    <int:channel id="itemList"/> 
    <int:channel id="aggregatorDiscardChannel"/> 

    <int-jms:message-driven-channel-adapter id="jmsIn" 
             channel="mqInbound" 
             destination="requestQueue" 
             message- converter="orderMessageConverter"/> 

    <int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/> 

    <int:chain id="aggregateList" input-channel="item" output-channel="itemList" > 
    <int:header-enricher> 
     <int:header name="sequenceSize" expression="4" overwrite="true"/> 
    </int:header-enricher> 
    <int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" /> 
    </int:chain> 

    <int:service-activator input-channel="itemList"     ref="displayAggregatedList" method="display"/> 
    <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/> 

    <bean id="orderAggregator"  class="com.samples.Aggregator.OrderAggregator"/> 
    <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/> 
    ... 
    .... 
</beans> 

OrderAggregator.java

public class OrderAggregator { 

@Aggregator 
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) { 

    return orderItemTypeList; 
} 

@CorrelationStrategy 
public String groupOrders(OrderItemType orderItemType) { 

    return "items"; 
} 

} 

DisplayAggregatedList.java

public class DisplayAggregatedList { 

public void display(List <OrderItemType> orderItemTypeList) { 

    System.out.println("######## Display Aggregated ##############"); 
    for(OrderItemType oit : orderItemTypeList) { 
     System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity()); 
    } 
} 
public void displayDiscarded(Message<?> message) { 

    System.out.println("######## Display Discarded ##############" + message); 
} 
} 

回答

1

你需要什么叫expire-groups-upon-completion

设置为true时(默认为false),已完成的组将从消息存储中删除,从而允许具有相同关联的后续消息组成新的组。默认行为是将与完成组具有相同关联性的消息发送到丢弃通道。

如果你需要释放未完成的群体反正(2个订单左,例如),考虑使用group-timeouthttp://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to

+0

感谢@Artem Bilan为您的回应。聚合器似乎对我的代码工作不一致。根据我推送的消息数量,有时可以使用。 但是,下面的消息序列是**不工作**。 ** 1)**订购3条订单消息。 (isbn#:001,002,003) ** 2)** 1订单项的订单邮件。 (isbn#:004) 消息应在第二条消息后发布。然而,第二条消息(Isbn:004)在**丢弃通道**中可见。 –

+0

下面的scenerio作品: ** 1)** 5订单项的订单消息。 (isbn#:011,012,013,014,015) ** 2)** 5订单项目的订单消息。 (isbn#:016,017,018,019,020) ** 3)**具有2个订单项的订单消息。 (isbn#:021,022) **发布了三个消息组**。组1(011,012,013,014),组2(015,016,017,018),组3(019,020,021,022)。 但是,由于某种原因,在我上次评论中共享的较早序列(3 orderitem + 1 orderitem)不起作用 –

+0

请使用'expire-groups-upon-completion =“true”并考虑在发布时使用'MessageCountReleaseStrategy' -strategy' –

0

请使用过期团,在完成=“true”,并考虑到使用MessageCountReleaseStrategy`发布策略 - Artem Bilan

+0

你应该接受我的答案,而不是 –