0
我试图测试一些与弹簧集成使用DSL的东西。这仅仅是一个测试到目前为止,流程简单:错误'是一个单向的'MessageHandler'弹簧整合聚合器DSL
- 并联
- 创建一些消息
- 过程(登录)他们它们聚集
- 日志聚集
除了来自聚合器,它工作正常:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle((GenericHandler<Integer>) (payload, headers) -> {
System.out.println("\t delaying message:" + payload + " on thread "
+ Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return payload;
})
.handle(this::logMessage)
.aggregate(a ->
a.releaseStrategy(g -> g.size()>10)
.outputProcessor(g ->
g.getMessages()
.stream()
.map(e -> e.getPayload().toString())
.collect(Collectors.joining(",")))
)
.handle(this::logMessage)
.get();
}
如果我忽略.aggregate(..)部分,则示例正在工作。
枝条的聚合,我得到以下异常:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/[email protected]) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
据我了解,它抱怨没有从聚合输出?
完整的源代码可以在这里找到:hithub
谢谢,wireTap做到了。我试图改变logMessage()来返回消息,但这并没有改变行为。 – bert
从1.2版本开始,就有一个'.log()'操作符。请阅读关于此事的博客文章:https://spring.io/blog/2016/10/14/java-dsl-for-spring-integration-1-2-release-is-available –