我试图实现一个骆驼路由,它从远程系统队列(System.A.out)读取请求消息该路由查看消息正文并将其动态路由到另一个系统队列(System.B.in)此路由然后完成,并等待队列中的下一条消息(当前它阻塞并等待临时队列上的响应)骆驼JMS异步请求回复
System.B在队列中读取它( System.B.in,并不总是骆驼路由)处理该消息并在其出队列(System.B.out)上丢弃响应 System.B使用来自请求消息的JMSMessageID作为JMSCorrelationID在其响应,这是它从请求保持。
骆驼路由(类似于System.A.out,但监听System.B.out)接收响应消息并使用JMSCorrelationID(该请求不会有JMSCorrelationID,因此将被路由消息主体)找到请求的JMSReplyTo队列(System.A.in)并将响应放在System.A队列中的System.A上进行处理。
我使用SpringBoot和骆驼2.18.3,消息队列为IMB MQ版本8
我的路线是这样的:
@Override
public void configure() throws Exception {
//@formatter:off
Predicate validRoute = header("route-valid").isEqualTo(true);
Predicate inValidRoute = header("route-valid").isEqualTo(false);
Predicate splitRoute = header("route-split").isEqualTo(true);
Predicate singleRoute = header("route-split").isEqualTo(false);
Predicate validSplitRoute = PredicateBuilder.and(validRoute, splitRoute);
Predicate validSingelRoute = PredicateBuilder.and(validRoute, singleRoute);
from(endpoint(incomingURI)).routeId(routeId)
.process(exchange -> {
exchange.getIn().setHeader("route-source", format("%s-%s", incomingURI, routeId));
})
.to(endpoint(format("bean:evaluateIncomingMessageService?method=routeMessage(*, %s)", replyToURI)))
.choice()
.when(validSingelRoute)
.log(DEBUG, "Creating a Single route")
.to(endpoint("bean:messageCoalitionService?method=saveInstruction(*)"))
.setExchangePattern(ExchangePattern.InOut)
.toD("${header.route-recipients}")
.when(inValidRoute)
.log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
.to(endpoint(deadLetterURI))
.routeId(format("%s-%s", incomingURI, routeId))
.when(validSplitRoute)
.log(DEBUG, "Creating a Split route")
.to(endpoint("bean:messageCoalitionService?method=saveInstructions(*)"))
.setExchangePattern(ExchangePattern.InOut)
.multicast()
.toD("${header.route-recipients}").endChoice()
.otherwise()
.log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
.to(endpoint(deadLetterURI))
.routeId(format("%s-%s", incomingURI, routeId));
春天豆evaluateIncomingMessageService决定,如果消息是请求(无相关ID)或响应并设置请求的路由标头。我希望Camel能够自动将响应路由到Request.JMSReplyTo队列,如果不是这样做的话,怎么办?
replyToURI在Camel Route构建器中配置,如果路由在System.A.out上侦听,其replyToURI将始终为System.A.in。
evaluateIncomingMessageService.routeMessage看起来是这样的:
public void routeMessage(final Exchange exchange, final String replyToURI) {
String correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
if (correlationId != null) {
log.debug("Processing Message Response with JMSCorrelationID [{}]", correlationId);
exchange.getIn().setHeader("JMSReplyTo", replyToURI);
} else {
// Request Messages have nave NO correlationId
log.debug("Processing Message Request with MessageID [{}] and JMSMessageID: [{}]",
exchange.getIn().getMessageId(),
exchange.getIn().getHeader("JMSMessageID") != null ? exchange.getIn().getHeader("JMSMessageID").toString() : exchange.getIn().getMessageId());
String message = exchange.getIn().getBody(String.class);
Set<ContentBasedRoute> validRoutes = contentBasedRouting
.stream().filter(
routeEntity -> Pattern.compile(
routeEntity.getRegularExpression(), DOTALL).matcher(message).matches()).collect(Collectors.toSet());
if (validRoutes.isEmpty()) {
log.warn("No valid routes found for message: [{}] ", message);
exchange.getIn().setHeader("route-valid", false);
} else {
HashMap<String, ContentBasedRoute> uniqueRoutes = new HashMap<>();
validRoutes.stream().forEach(route -> uniqueRoutes.putIfAbsent(route.getDestination(), route));
exchange.getIn().setHeader("route-valid", true);
exchange.getIn().setHeader("route-count", uniqueRoutes.size());
exchange.getIn().setHeader("JMSReplyTo", replyToURI);
//if (exchange.getIn().getHeader("JMSMessageID") == null) {
// exchange.getIn().setHeader("JMSMessageID", exchange.getIn().getMessageId());
//}
if (uniqueRoutes.size() > 1) {
log.debug("Building a split route");
StringBuilder routes = new StringBuilder();
StringBuilder routeIds = new StringBuilder();
StringBuilder routeRegex = new StringBuilder();
uniqueRoutes.keySet().stream().forEach(i -> routes.append(i).append(","));
uniqueRoutes.values().stream().forEach(j -> routeIds.append(j.getRouteId()).append(","));
uniqueRoutes.values().stream().forEach(k -> routeRegex.append(k.getRegularExpression()).append(","));
routes.deleteCharAt(routes.length() - 1);
routeIds.deleteCharAt(routeIds.length() - 1);
routeRegex.deleteCharAt(routeRegex.length() - 1);
exchange.getIn().setHeader("route-split", true);
exchange.getIn().setHeader("route-uuid", routeIds.toString());
exchange.getIn().setHeader("route-regex", routeRegex.toString());
exchange.getIn().setHeader("route-recipients", routes.toString());
} else {
exchange.getIn().setHeader("route-split", false);
exchange.getIn().setHeader("route-uuid", uniqueRoutes.values().iterator().next().getRouteId());
exchange.getIn().setHeader("route-regex", uniqueRoutes.values().iterator().next().getRegularExpression());
exchange.getIn().setHeader("route-recipients", uniqueRoutes.values().iterator().next().getDestination());
}
}
}
}
豆messageCoalitionService只是保存邮件正文和标题这样的消息可以被复制并为系统的审计。
我不知道如果我错误地解决了这个问题,我应该使用驼峰异步API还是需要管道来实现这个?这种模式看起来接近我需要http://camel.apache.org/async.html(异步请求答复)任何帮助将是非常感谢。