2017-05-15 36 views
1

我是Spring Integration和EIP的新手。春季集成 - 可以在特定时间订阅邮件

我试图解决自己,但它不会在我脑海中找到适当的解决方案。

下面是我需要的流程示例。

  1. 客户端发送Json数据到SI。
  2. HTTP入站网关将Json数据作为有效负载。
  3. 此Json数据将使用两个单独的HTTP出站。 但是,第一次呼出呼叫的响应后跟着第二个出站呼叫。
  4. 回复结果步骤0(在HTTP入站网关replychannel)

我想用publishSubscribeChannel或RecipientListRouter发送相同的信息分离的呼叫。 但在这种情况下,它需要在特定时间执行第二个http出站。 而且按照我的理解应该是一个事务上一步的答复4.

或者......

是否有可能保持消息实例(JSON数据从http入站网关),在第二次调用使用第一次通话后。

请给我一些想法,如果你有一个示例代码,它一定是美好的。 目前我正在写JAVA配置风格,但XML样本也欢迎。

更新问题

我想补充的桥梁和executorChannel。

这是我的代码,很少有东西不能像我期望的那样工作。

  1. 我想第二个出站处理程序(httpPMGateway)使用来自aIncomingGateway消息(使用正因为如此publishSubscribeChannel),但经过httpJ3Gateway出站处理程序,它使用消息从httpJ3Gateway。 我使用httpJ3Gateway的响应消息的路由。 (Ok - > httpPMgateway,默认 - > backchannel)

  2. 我想从j3ValidationFlow或broadcastFlow结束时向aIncomingGateway发送响应消息。但是一个IncomingGateway在发送之前已经收到了回复消息。这是日志消息。

2017年5月16日11:17:09.061 WARN 11488 --- [池-1-螺纹-1] cMessagingTemplate $ TemporaryReplyChannel:接收应答消息,但 接收线程已经收到一个回复:

这是代码段:

@Bean 
public MessagingGatewaySupport aIncomingGateway() { 
    HttpRequestHandlingMessagingGateway handler = new HttpRequestHandlingMessagingGateway(true); 

    RequestMapping requestMapping = new RequestMapping(); 
    requestMapping.setMethods(HttpMethod.POST); 
    requestMapping.setPathPatterns("https://stackoverflow.com/a/incoming"); 
    requestMapping.setConsumes("application/json"); 
    requestMapping.setProduces("application/json"); 


    handler.setRequestMapping(requestMapping); 
    handler.setMessageConverters(getMessageConverters()); 
    handler.setRequestPayloadType(Amount.class); 

    handler.setReplyChannelName("backAChannel"); 
    return handler; 
} 

@Bean 
public MessageHandler httpJ3Gateway(){ 
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/j3/incoming"); 
    httpHandler.setHttpMethod(HttpMethod.POST); 
    httpHandler.setExpectReply(true); 
    httpHandler.setMessageConverters(getMessageConverters()); 
    httpHandler.setExpectedResponseType(String.class); 
    httpHandler.setRequestFactory(requestFactory()); 
    return httpHandler; 
} 

@Bean 
public MessageHandler httpPMGateway(){ 
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/pm/incoming"); 
    httpHandler.setHttpMethod(HttpMethod.POST); 
    httpHandler.setExpectReply(true); 
    httpHandler.setMessageConverters(getMessageConverters()); 
    httpHandler.setExpectedResponseType(String.class); 
    return httpHandler; 
} 

@Bean 
public MessageChannel broadChannel(){ 
    return new ExecutorChannel(Executors.newCachedThreadPool()); 
} 

@Bean 
@ServiceActivator(inputChannel = "brigdeChannel") 
public MessageHandler bridgeHandler(){ 
    BridgeHandler handler = new BridgeHandler(); 
    handler.setOutputChannel(broadChannel()); 
    return handler; 
} 


@Bean 
public IntegrationFlow aIncomingFlow() { 
    return IntegrationFlows.from(aIncomingGateway()) 
      .log(LoggingHandler.Level.INFO, "From Inbound http gateway", m -> m.getPayload().toString()) 
      .publishSubscribeChannel(p -> p 
        .subscribe(s -> s.channel("j3Channel")) 
        .subscribe(s -> s.bridge(b -> bridgeHandler())) 
        ) 
      .get(); 
} 

@Bean 
public IntegrationFlow j3ValidationFlow() { 
    return IntegrationFlows.from("j3Channel") 
      .log(LoggingHandler.Level.INFO, "Run j3ChannelFlow", m -> m.getPayload().toString()) 
      .handle(httpJ3Gateway()) 
      .route("headers.http_statusCode", m -> m 
        .resolutionRequired(false) 
        .channelMapping("OK", "brigdeChannel") 
        .defaultOutputToParentFlow() 
        ) 
      .channel("backAChannel") 
      .get(); 
} 

@Bean 
public IntegrationFlow broadcastFlow() { 
    return IntegrationFlows.from("broadChannel") 
      .log(LoggingHandler.Level.INFO, "Run broadcastFlow", m -> m.getPayload().toString()) 
      .handle(httpPMGateway()) 
      .channel("backAChannel") 
      .get(); 
} 

请帮我拿个主意。
我提前感谢。谢谢你在这个愚蠢的问题上帮助我。

回答

0

是;使用pub/sub频道或收件人列表路由器

inbound -> ... -> pubsub 

pubsub -> outbound1 

pubsub -> bridge -> executorChannel -> outbound2 

outbound1将在入站线程上运行; outbound2将在另一个线程上运行。

+0

感谢您的快速回答。我还有一个问题。如果outbound2会运行另一个线程,它可以回复原来的入站(客户端)? – Jun

+0

是的,如果'outbound1'是单向的非响应组件(Spring集成方面的'Outbound Channel Adapter')。只有一个回复可以发送到等待的入站网关。 –

+0

谢谢你们两位。我会尝试。我可能会回到Stackoverflow .... – Jun