2016-11-21 24 views
0

我试图测试一些与弹簧集成使用DSL的东西。这仅仅是一个测试到目前为止,流程简单:错误'是一个单向的'MessageHandler'弹簧整合聚合器DSL

  • 并联
  • 创建一些消息
  • 过程(登录)他们它们聚集
  • 日志聚集

除了来自聚合器,它工作正常:

@Bean 
public IntegrationFlow integrationFlow() { 
    return IntegrationFlows 
      .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS))) 
      .channel(MessageChannels.executor(Executors.newCachedThreadPool())) 
      .handle((GenericHandler<Integer>) (payload, headers) -> { 
       System.out.println("\t delaying message:" + payload + " on thread " 
         + Thread.currentThread().getName()); 
       try { 
        Thread.sleep(2000); 
       } catch (InterruptedException e) { 
        System.err.println(e.getMessage()); 
       } 
       return payload; 
      }) 
      .handle(this::logMessage) 
      .aggregate(a -> 
        a.releaseStrategy(g -> g.size()>10) 
        .outputProcessor(g -> 
          g.getMessages() 
            .stream() 
            .map(e -> e.getPayload().toString()) 
            .collect(Collectors.joining(","))) 

        ) 
      .handle(this::logMessage) 
      .get(); 

} 

如果我忽略.aggregate(..)部分,则示例正在工作。

枝条的聚合,我得到以下异常:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/[email protected]) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 

据我了解,它抱怨没有从聚合输出?

完整的源代码可以在这里找到:hithub

回答

2

问题是聚合前handle() - 它不会产生任何结果所以没什么聚集......

 .handle(this::logMessage) 
     .aggregate(a -> 

想必logMessage(Message<?>)void返回类型。

如果您希望在聚合器之前登录,请使用wireTap或更改logMessage以在记录后返回Message<?>

 .wireTap(sf -> sf.handle(this::logMessage)) 
+0

谢谢,wireTap做到了。我试图改变logMessage()来返回消息,但这并没有改变行为。 – bert

+0

从1.2版本开始,就有一个'.log()'操作符。请阅读关于此事的博客文章:https://spring.io/blog/2016/10/14/java-dsl-for-spring-integration-1-2-release-is-available –