我想了解聚合器的基础知识。下面是使用情况下,我想实现:从队列无法让聚合器工作
1)读消息(订单详细信息)。
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
一个顺序消息将包含多个OrderItem的。我们可以预期在队列中有数百个订单消息。
2)结束结果::
a)每个OrderItem的应写入到文件中。
b)中这样的文件应该被写入唯一的文件夹。
举个例子,假设我们有两个为了消息 - 每个包含三个OrderItem的。
因此,我们需要创建2个文件夹:
在 “文件夹1”,应该有4个文件(1个OrderItem的在每个文件)
在 “文件夹2”,应该有2文件(每个文件中有1 orderitem)。这里为简单起见,我们假设不再有订单消息来了,我们可以在5分钟后写入。
实现:
- 我能够读取从队列(的WebSphere MQ)的消息,并成功解组消息。
- 用于分流分裂基于OrderItem的计数数量的消息。
- 二手聚集到组中的4
我无法大小的消息得到聚合器按我的理解而努力。
- 我推一个顺序当4 OrderItem的,该消息被正确地得到聚合。
- 我推一个订单与5 orderitem,第4个是聚合,但最后一个被发送到丢弃渠道。这是因为MessageGroup被释放,所以最后的消息被丢弃。
- 我推了两个订单,每个订单包含2 orderitem。最后的2 orderitem被发送到放弃渠道。
关联策略是硬编码的(OrderAggregator。Java),但上述情况应该已经奏效。
需要关于如何实现这种使用情况的指针,我可以将它们组合到4中并写入唯一文件夹。 请注意,orderitem都是独立的书籍订单,它们之间没有任何关系。
下面是配置。
弹簧bean.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
OrderAggregator.java
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders(OrderItemType orderItemType) {
return "items";
}
}
DisplayAggregatedList.java
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
感谢@Artem Bilan为您的回应。聚合器似乎对我的代码工作不一致。根据我推送的消息数量,有时可以使用。 但是,下面的消息序列是**不工作**。 ** 1)**订购3条订单消息。 (isbn#:001,002,003) ** 2)** 1订单项的订单邮件。 (isbn#:004) 消息应在第二条消息后发布。然而,第二条消息(Isbn:004)在**丢弃通道**中可见。 –
下面的scenerio作品: ** 1)** 5订单项的订单消息。 (isbn#:011,012,013,014,015) ** 2)** 5订单项目的订单消息。 (isbn#:016,017,018,019,020) ** 3)**具有2个订单项的订单消息。 (isbn#:021,022) **发布了三个消息组**。组1(011,012,013,014),组2(015,016,017,018),组3(019,020,021,022)。 但是,由于某种原因,在我上次评论中共享的较早序列(3 orderitem + 1 orderitem)不起作用 –
请使用'expire-groups-upon-completion =“true”并考虑在发布时使用'MessageCountReleaseStrategy' -strategy' –