1
在集成流程中,使用其默认策略的拆分将从列表中发布项目。该项目的处理可能会失败。我想处理这个错误,并将一个新的消息与前一个映射信息(除了一个自定义错误头部)一起发送到正常的消息传递通道。如何在Spring Integration Java DSL中自定义消息聚合逻辑
在聚合器中,我想定制聚合逻辑来生成其他类型的消息,包括失败进程的计数和未失败消息的结果。
在这里,我解释我是如何发送错误消息的标题:
@Bean
public IntegrationFlow socialMediaErrorFlow() {
return IntegrationFlows.from("socialMediaErrorChannel")
.wireTap(sf -> sf.handle("errorService", "handleException"))
.<MessagingException>handle((p, h)
-> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
.copyHeaders(p.getFailedMessage().getHeaders())
.setHeader("ERROR", true)
.build()
)
.channel("directChannel_1")
.get();
}
我想要的聚合产生这种类型的对象:
public class Result {
private Integer totalTask;
private Integer taskFailed;
private List<CommentEntity> comments;
}
我应该如何处理这个?
在此先感谢。
由于阿尔乔姆的帮助下,我做这个实现:
.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
@Override
public Object processMessageGroup(MessageGroup mg) {
Integer failedTaskCount = 0;
Integer totalTaskCount = mg.getMessages().size();
List<CommentEntity> comments = new ArrayList<>();
for(Message<?> message: mg.getMessages()){
if(message.getHeaders().containsKey("ERROR"))
failedTaskCount++;
else
comments.addAll((List<CommentEntity>)message.getPayload());
}
return new IterationResult(totalTaskCount, failedTaskCount, comments);
}
}))