2015-12-12 159 views
2

我想知道我可以如何使RedisQueueMessageDrivenEndpoint与IntegrationFlow一起工作,以便我可以接收从列表中出列的消息,并在下面的代码中指定它。 “redisRpopChannel()”似乎根本没有收到任何消息。请帮忙。如何使RedisQueueMessageDrivenEndpoint与IntegrationFlow一起使用?

@Bean 
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) { 
    RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    gateway.setArgumentsSerializer(serializer); 
    return gateway; 
} 

@Bean 
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) { 
    ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true); 
    strategy.setBeanFactory(beanFactory); 
    gateway.setArgumentsStrategy(strategy); 
    return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f 
      .enrich(e -> e.<ObjectNode>requestPayload(m -> { 
       String partition = m.getHeaders().get("correlationId").toString(); 
       ObjectNode objectNode = m.getPayload(); 
       objectNode.put(PayLoadKeys.PARTITION, partition); 
       objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString()); 
       return objectNode; 
      }).shouldClonePayload(false) 
        .header(RedisHeaders.COMMAND, "LPUSH").header("queue", files)) 
      .handle(gateway).channel("redisLpushResponseFlow.input"))); 
} 

@Bean 
public IntegrationFlow redisLpushResponseFlow() { 
    return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> { 
       ObjectNode objectNode = mapper.createObjectNode(); 
       objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString()); 
       if(h.get("mode").equals("debug")) { 
        objectNode.set(PayLoadKeys.DEBUG, 
          mapper.valueToTree(p.stream().collect(Collectors.toList()))); 
       } 
       return objectNode; 
      }).channel(httpInboundReplyChannel()); 
@Bean 
public MessageChannel redisRpopChannel() { 
    return MessageChannels.queue().get(); 
} 

@Bean(name = PollerMetadata.DEFAULT_POLLER) 
public PollerMetadata poller() { 
    return Pollers.fixedRate(500).get(); 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setBeanFactory(beanFactory); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(redisRpopChannel()); 
    endpoint.afterPropertiesSet(); 
    endpoint.start(); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow redisQueuePollingFlow() { 

    class ThrottledTaskExecutor implements TaskExecutor { 
     final Semaphore semaphore; 
     final TaskExecutor taskExecutor; 

     ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) { 
      this.taskExecutor = taskExecutor; 
      this.semaphore = new Semaphore(taskExecutor.getCorePoolSize()); 
     } 

     @Override 
     public void execute(Runnable task) { 
      if (task == null) { 
       throw new NullPointerException("Task is null in ThrottledTaskExecutor."); 
      } 
      doSubmit(task); 
     } 

     void doSubmit(final Runnable task) { 
      try { 
       semaphore.acquire(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new TaskRejectedException("Task could not be submitted because of a thread interruption."); 
      } 
      try { 
       taskExecutor.execute(new FutureTask<Void>(task, null) { 

        @Override 
        protected void done() { 
         semaphore.release(); 
        } 
       }); 
      } catch (TaskRejectedException e) { 
       semaphore.release(); 
       throw e; 
      } 
     } 
    } 

    return IntegrationFlows 
      .from(redisRpopChannel()) 
      .transform(Transformers.fromJson(ObjectNode.class)) 
      .handle(message -> { 
       ObjectNode p = (ObjectNode) message.getPayload(); 
       ThreadPoolTaskExecutor taskExecutor = taskExecutor(); 
       ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor); 
       if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) { 
        String array = p.remove(PayLoadKeys.ID_ARRAY).asText(); 
        if (p.hasNonNull(array)) { 
         p.remove(array).forEach(id -> { 
          ObjectNode param = p.deepCopy(); 
          final Long finalId = id.asLong(); 
          param.put("id", finalId); 
          throttledTaskExecutor.execute(new JobLaunchTask(param)); 
         }); 
        } 
       } else { 
        throttledTaskExecutor.execute(new JobLaunchTask(p)); 
       } 
       taskExecutor.shutdown(); 
      }).get(); 
} 

回答

1

使用消息驱动的端点在DSL(被定义为@Bean S)当目前a problem

问题是在初始化期间需要输出通道。但是,当端点稍后连接到流中时,该通道将被替换。

你不应该调用@Bean定义中像afterPropertiesSet()start()方法。

这个工作对我来说...

@Bean 
public RedisConnectionFactory connectionFactory() { 
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); 
    jedisConnectionFactory.setPort(6379); 
    return jedisConnectionFactory; 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(new DirectChannel()); // will be replaced 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(redisQueueMessageDrivenEndpoint(connectionFactory)) 
      .handle(System.out::println) 
      .get(); 
} 

我在Redis的-CLI > lpush foo '{"foo":"bar"}'进行了测试。

编辑

然而,你的技术也工作(我)......

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(rpopChannel()); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(rpopChannel()) 
      .handle(System.out::println) 
      .get(); 
} 

@Bean 
public MessageChannel rpopChannel() { 
    return new DirectChannel(); 
} 

同样,我删除了从端点的所有容器管理的性能;春天设置所有这些。

+0

感谢您的评论。我非常感激。我也很佩服你的工作。我想知道为什么没有协议适配器支持使用redis进行迭代并添加到组中,所以我们都可以像IntegrationFlows.from(Redis.inboundGateway(connectionFactory,queue))那样编写? – hanishi

+0

我们资源有限。我们要求社区帮助我们优先考虑通过[JIRA](https://jira.spring.io/browse/INTEXT)向Redis等添加一流的支持。随意在那里打开一个改进问题,我们会看到我们能做些什么。我们也欢迎[贡献](https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md)。 –

相关问题