2016-03-15 13 views
1

您好我有流定义,就像我below.Where通过线拉从S3分割线文件,并调用HTTP客户端,并把名为channel.My运输兔和注入春天的整合/ XD HTTP的出站

2个不同的模板休息
stream aws-s3|custom processor| custom-http-client --url1=https://test1.com --url2=https://test2.com --filterAttribute=messageAttribute --httpMethod=POST --nonRetryErrorCodes=400,401,404,500 --charset=UTF-8 --replyTimeout=30000 --mapHeaders=Api-Key,Content-Type --requestTimeOut=30000 |processor> queue:testQueue 

我的HTTP的配置看起来像的下方,使用Apache HTTP客户端连接池和多线程我把我回DLQ像插座超时和重试这一切非常错误。

现在我URI1使用OAuth和URI2使用基本restTemplate。我如何注入两个休息模板到我的http出站?一个将oauth和其他将基本模板?

<beans:beans xmlns="http://www.springframework.org/schema/integration" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" 
      xmlns:int-http="http://www.springframework.org/schema/integration/http" 
      xmlns:context="http://www.springframework.org/schema/context" 
      xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd 
       http://www.springframework.org/schema/integration 
       http://www.springframework.org/schema/integration/spring-integration.xsd 
       http://www.springframework.org/schema/integration/http 
       http://www.springframework.org/schema/integration/http/spring-integration-http.xsd 
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context-4.0.xsd"> 

    <!-- <context:property-placeholder location="${xd.module.config.location}\processor\${xd.module.name}\batch-http.properties" 
     ignore-resource-not-found="true" local-override="true"/> --> 

    <context:property-placeholder /> 

    <!-- logger changes start --> 
    <channel-interceptor pattern="*" order="3"> 
     <beans:bean class="org.springframework.integration.channel.interceptor.WireTap"> 
      <beans:constructor-arg ref="loggingChannel" /> 
     </beans:bean> 
    </channel-interceptor> 

    <logging-channel-adapter id="loggingChannel" log-full-message="true" level="ERROR"/> 

<!-- logger changes end --> 


    <header-filter input-channel="input" 
        output-channel="inputX" header-names="x-death"/> 

    <service-activator input-channel="inputX" ref="gw" /> 

    <gateway id="gw" default-request-channel="toHttp" default-reply-timeout="0" error-channel="errors" /> 

    <beans:bean id="inputfields" class="test.HTTPInputProperties"> 
     <beans:property name="nonRetryErrorCodes" value="${nonRetryErrorCodes}"/> 
    </beans:bean> 
    <beans:bean id="responseInterceptor" class="test.ResponseInterceptor"> 
     <beans:property name="inputProperties" ref="inputfields" /> 
    </beans:bean> 

    <chain input-channel="errors" output-channel="output"> 
     <!-- examine payload.cause (http status code etc) and decide whether 
      to throw an exception or return the status code for sending to output --> 
     <header-filter header-names="replyChannel, errorChannel" /> 
     <transformer ref="responseInterceptor" /> 
    </chain> 


    <int-http:outbound-gateway id='batch-http' header-mapper="headerMapper" 
           request-channel='toHttp' 
           rest-template="batchRestTemplate" 
           url-expression="payload.contains('${filterAttribute}') ? '${url1}' : '${url2}'" http-method="${httpMethod}" 
           expected-response-type='java.lang.String' charset='${charset}' 
           reply-timeout='${replyTimeout}' reply-channel='output'> 
    </int-http:outbound-gateway> 

    <beans:bean id="batchHTTPConverter" class="org.springframework.http.converter.StringHttpMessageConverter" > 
     <beans:constructor-arg index="0" value="${charset}"/> 
     <beans:property name="supportedMediaTypes" value = "application/json;UTF-8" /> 

    </beans:bean> 

    <beans:bean id="batchRestTemplate" class="testBatchRestTemplate" > 
     <beans:constructor-arg name="requestTimeOut" value="${requestTimeOut}"/> 
     <beans:constructor-arg name="maxConnectionPerRoute" value="${maxConnectionPerRoute}"/> 
     <beans:constructor-arg name="totalMaxConnections" ref="${totalMaxConnections}"/> 
</beans:bean> 





<beans:bean id="headerMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper" 
      factory-method="outboundMapper"> 
<beans:property name="outboundHeaderNames" value="${mapHeaders}"/> 
<beans:property name="userDefinedHeaderPrefix" value=""/> 
</beans:bean> 

<channel id="output" /> 
<channel id="input" /> 
<channel id="inputX" /> 
<channel id="toHttp" /> 

     </beans:beans> 



public class BatchRestTemplate extends RestTemplate{ 

    private static final Logger LOGGER = LoggerFactory 
       .getLogger(BatchRestTemplate.class); 

    private static Integer requestTimeOut; 

    private static Integer totalMaxConnections; 
    private static Integer maxConnectionPerRoute; 






    public BatchRestTemplate(Integer requestTimeOut,Integer totalMaxConnections,Integer maxConnectionPerRoute) throws NoSuchAlgorithmException { 
      super(createBatchHttpRequestFactory()); 
     List<HttpMessageConverter<?>> messageConverters= new ArrayList<HttpMessageConverter<?>>(); 
     messageConverters.addAll(getMessageConverters()); 
     messageConverters.add(0,new StringHttpMessageConverter(Charset.forName("UTF-8"))); 
     setMessageConverters(messageConverters); 

    } 

     private static ClientHttpRequestFactory createBatchHttpRequestFactory() throws NoSuchAlgorithmException { 

      CloseableHttpClient httpClient; 
      HttpComponentsClientHttpRequestFactory httpRequestFactory; 

      SSLConnectionSocketFactory socketFactory; 

       socketFactory = new SSLConnectionSocketFactory(
         SSLContext.getDefault(), 
         new String[] {"TLSv1"}, 
         null, 
         SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); 

      Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() 
        .register("http", PlainConnectionSocketFactory.getSocketFactory()) 
        .register("https", socketFactory) 
        .build(); 
      PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry); 
      cm.setMaxTotal(250); 
      cm.setDefaultMaxPerRoute(100); 
      cm.closeExpiredConnections(); 



      RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(30000) 
        .setConnectionRequestTimeout(30000).setSocketTimeout(30000).build(); 



      httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).setConnectionManager(cm).build(); 


      httpRequestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); 
      return httpRequestFactory; 
    } 


    } 

ResponseInterceptor

public class ResponseInterceptor { 

    private HTTPInputProperties inputProperties; 
    private static final Logger LOGGER = LoggerFactory.getLogger(ResponseInterceptor.class); 

    /** 
    * Intercepts the errorMessage from the API response and sends appropriate 
    * information to the Output channel. 
    * 
    * @param errorMessage 
    * @return Message 
    */ 
    public Message<String> transform(Message<MessagingException> errorMessage) { 

     LOGGER.error("Inside Response Interceptor !"); 
     Message<String> responseMessage = null; 
     try { 
      if (null != errorMessage && null != errorMessage.getPayload() 
        && null != errorMessage.getPayload().getCause()) { 
       LOGGER.error("Cause is - " + errorMessage.getPayload().getCause().getMessage()); 
       if (errorMessage.getPayload().getCause() instanceof HttpClientErrorException) { 

        HttpClientErrorException clientError = (HttpClientErrorException) errorMessage.getPayload() 
          .getCause(); 
        LOGGER.error("Error in ResponseInceptor", clientError); 
        List<String> errorCodeList = getErrorCodes(inputProperties.getNonRetryErrorCodes()); 
        // intercept Only those errors that are defined as 
        // nonRetryErrorCodes options in stream definition 
        if (null != clientError.getStatusCode() 
          && errorCodeList.contains(clientError.getStatusCode().toString())) { 

         LOGGER.error("Error in Response Body", clientError.getResponseBodyAsString()); 
         LOGGER.debug("Non retry message found. Sending to output channel without retrying"); 

         responseMessage = MessageBuilder.withPayload((null == clientError.getResponseBodyAsString() || clientError.getResponseBodyAsString().isEmpty()) 
           ? getDefaultPayload(clientError.getStatusCode().toString()) : clientError.getResponseBodyAsString()) 
           .setHeader(BatchHttpClientConstants.HTTP_STATUS, clientError.getStatusCode().toString()) 
           .setHeader(BatchHttpClientConstants.REQUEST_OBJECT, 
             getFailedMessagePayload(errorMessage)) 
           .copyHeaders(errorMessage.getPayload().getFailedMessage().getHeaders()) 
           .setReplyChannelName(BatchHttpClientConstants.OUTPUT).setErrorChannelName(null).build(); 

        } else { 
         LOGGER.debug("Status code from API is not present in the nonRetryCodes"); 
        } 
       } else if (errorMessage.getPayload().getCause() instanceof HttpServerErrorException) { 

        LOGGER.error("Error is Instance of HttpServerErrorException"); 
        HttpServerErrorException serverError = (HttpServerErrorException) errorMessage.getPayload() 
          .getCause(); 

        responseMessage = MessageBuilder 
          .withPayload((null == serverError.getResponseBodyAsString() 
          || serverError.getResponseBodyAsString().isEmpty()) 
          ? getDefaultPayload(serverError.getStatusCode().toString()) 
          : serverError.getResponseBodyAsString()) 
          .setHeader(BatchHttpClientConstants.HTTP_STATUS, serverError.getStatusCode().toString()) 
          .setHeader(BatchHttpClientConstants.REQUEST_OBJECT, getFailedMessagePayload(errorMessage)) 
          .copyHeaders(errorMessage.getPayload().getFailedMessage().getHeaders()) 
          .setReplyChannelName(BatchHttpClientConstants.OUTPUT).setErrorChannelName(null).build(); 


       } 

      } 
     } catch (Exception exception) { 
      LOGGER.error("Exception occured while transforming errorResponse", exception); 
     } 

     // returning null will send the message back to previous module 
     return responseMessage; 
    } 

    private String getDefaultPayload(String httpStatusCode) { 

     JSONObject jsonResponse = new JSONObject(); 
     if (BatchHttpClientConstants.INTERNAL_SERVER_ERROR.equalsIgnoreCase(httpStatusCode)) { 
      jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.INTERNAL_SERVER_ERROR_SUBCODE); 
      jsonResponse.put(BatchHttpClientConstants.TEXT, "Internal Server Error"); 
     } else if (BatchHttpClientConstants.RESOURCE_NOT_FOUND.equalsIgnoreCase(httpStatusCode)) { 
      jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.RESOURCE_NOT_FOUND_SUBCODE); 
      jsonResponse.put(BatchHttpClientConstants.TEXT, "Empty Response From the API"); 
     }else{ 
      jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.GENERIC_ERROR_SUBCODE); 
      jsonResponse.put(BatchHttpClientConstants.TEXT, "Generic Error Occured."); 
     } 

     return jsonResponse.toString(); 

    } 

    /** 
    * Get Individual error codes using delimiter 
    * 
    * @param nonRetryErrorCodes 
    * @return List of Error Codes as string 
    */ 
    private List<String> getErrorCodes(String nonRetryErrorCodes) { 

     List<String> errorCodeList = new ArrayList<String>(); 
     StringTokenizer st = new StringTokenizer(nonRetryErrorCodes, BatchHttpClientConstants.DELIMITER); 
     while (st.hasMoreElements()) { 
      errorCodeList.add(st.nextToken()); 
     } 
     return errorCodeList; 
    } 

    /** 
    * returns failed Message Payload 
    * 
    * @param errorMessage 
    * @return String 
    * @throws UnsupportedEncodingException 
    */ 
    private byte[] getFailedMessagePayload(Message<MessagingException> errorMessage) 
      throws UnsupportedEncodingException { 

     if (null != errorMessage.getPayload().getFailedMessage() 
       && null != errorMessage.getPayload().getFailedMessage().getPayload()) { 
      return errorMessage.getPayload().getFailedMessage().getPayload().toString() 
        .getBytes(BatchHttpClientConstants.UTF_8); 
     } 

     return "".getBytes(BatchHttpClientConstants.UTF_8); 
    } 

    public HTTPInputProperties getInputProperties() { 
     return inputProperties; 
    } 

    public void setInputProperties(HTTPInputProperties inputProperties) { 
     this.inputProperties = inputProperties; 
    } 

} 

回答

2

你不能 - 你需要两个网关和路由器(使用URL路由到一个或其他)。

+0

我可以用单个模块和路线2个站网关根据payload.?I不同的网关不愿创建不同module.How我能做到这一点? – constantlearner

+0

是的;添加路由器; [路由器此处记录](http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#router)。您可以使用表达式来选择频道,就像你正在使用的URL - [见这里](http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#路由器命名空间)。 'payload.contains('$ {filterAttribute}')? 'channel1':'channel2'。 –

+0

同样一个疑问,从上面显示与路由器我的XML,我将注入OAuth2RestTemplate,当我创造我流我的OAuth的模板将被注射一次令牌。如何刷新工作,因为这将只注射一次? – constantlearner