如果你正在使用Spring Integration的2.2.x的,您可以用建议这么做...
public class CompletionAdvice extends AbstractRequestHandlerAdvice {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
Object result = callback.execute();
latch.countDown();
return result;
}
public CountDownLatch getLatch() {
return latch;
}
}
在您的测试环境,建议用bean工厂后处理器添加到适配器的处理程序。
public class AddCompletionAdvice implements BeanFactoryPostProcessor {
private final Collection<String> handlers;
private final Collection<String> replyProducingHandlers;
public AddCompletionAdvice(Collection<String> handlers, Collection<String> replyProducingHandlers) {
this.handlers = handlers;
this.replyProducingHandlers = replyProducingHandlers;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (String beanName : handlers) {
defineAdviceAndInject(beanFactory, beanName, beanName + "CompletionAdvice");
}
for (String beanName : replyProducingHandlers) {
String handlerBeanName = beanFactory.getAliases(beanName + ".handler")[0];
defineAdviceAndInject(beanFactory, handlerBeanName, beanName + "CompletionAdvice");
}
}
private void defineAdviceAndInject(ConfigurableListableBeanFactory beanFactory, String beanName, String adviceBeanName) {
BeanDefinition serviceHandler = beanFactory.getBeanDefinition(beanName);
BeanDefinition advice = new RootBeanDefinition(CompletionAdvice.class);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(adviceBeanName, advice);
serviceHandler.getPropertyValues().add("adviceChain", new RuntimeBeanReference(adviceBeanName));
}
}
后处理器添加到配置<bean class="foo.AddCompletionAdvice" />
。
最后,注入的意见(S)到你的测试用例
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class TestAdvice {
@Autowired
private CompletionAdvice fooCompletionAdvice;
@Autowired
private CompletionAdvice barCompletionAdvice;
@Autowired
private MessageChannel input;
@Test
public void test() throws Exception {
Message<?> message = new GenericMessage<String>("Hello, world!");
input.send(message);
assertTrue(fooCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
assertTrue(barCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
}
}
,等待锁存器(ES)。
<int:publish-subscribe-channel id="input"/>
<int:outbound-channel-adapter id="foo" channel="input" ref="x" method="handle"/>
<int:service-activator id="bar" input-channel="input" ref="x"/>
<bean class="foo.AddCompletionAdvice">
<constructor-arg name="handlers">
<list>
<value>foo</value>
</list>
</constructor-arg>
<constructor-arg name="replyProducingHandlers">
<list>
<value>bar</value>
</list>
</constructor-arg>
</bean>
<bean id="x" class="foo.Foo" />
我添加这些类到Gist
编辑:更新,以提供最终消费者一般情况下(无应答),并回复产生的消费者。
这个看上去很不错。我会尽快将此标记为解决方案,谢谢。 – cproinger 2013-04-29 14:53:43
我已经为适配器设置了一个id并使用了你的代码。现在我得到 引起:org.springframework.beans.NotWritablePropertyException:Bean类的无效属性'adviceChain'[org.springframework.integration.handler.MethodInvokingMessageHandler]:Bean属性'adviceChain'不可写或具有无效的setter方法。 setter的参数类型是否与getter的返回类型相匹配? – cproinger 2013-04-29 19:10:56
我试图设置在轮询器代替信道适配器具有这种属性的建议链,但completionAdvice不调用 – cproinger 2013-04-29 19:35:18