2016-08-24 44 views
1

我正在使用Spring Boot 1.4.0.RELEASE,Spring Integration 4.3.1.RELEASE,Spring Integration DSL 1.2.0.M1。春季集成 - 如何调试'调度员没有订户'?

我想要做的事:

我正在写会(使用入站通道适配器)从FTP和本地文件系统中读取文件的应用程序,文件传输到本地工作目录(使用文件出站网关),处理,然后将它们移动到最终目的地(文件出站网关/适配器)。

我遇到了'分派器没有订户通道'错误的问题。我相信这可能意味着上下文中的某些内容被打破,集成组件不会启动。上下文本身表示它在我调试时处于活动状态。

我的实际配置相当大,所以我不想找人为我找到解决方案。我正在寻找一些关于如何查看以及如何确定哪些组件抱怨的指导。

实际的错误在下面。

DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'. 

fileReadingFlowInboundChannelAdapter从一个目录(基本上,我问了一下here。这里面没有什么复杂的读取文件。该适配器将消息发送到一个.log()处理,丰富了头,将其发送到一个处理器(Files.outboundgateway),最后一个MessageChannel

我已经试过:

  • 我已经通过的MessageChannel S和东西微升链走(没有拼写错误,全部Bean存在)。
  • 我在fileReadingFlow中添加了更多LoggingHandler s来标识消息错误的位置。
  • 我已经删除了fileReadingFlow的部分内容,以查看我是否可以将消息更远地传送出去。
  • 我已经删除了一些Component s,看看我能否找到问题。
  • 我添加了org.springframework.integration的调试日志记录,并且没有出现类似错误或警告的内容。

我发现那是什么第一次流试图做的不是记录(甚至enrichHeaders)以外的东西,发生了错误调度,并在邮件中errorChannel结束了。当我将fileReadingFlow更改为只读文件时,记录一条消息,并终止一个空处理程序,我收到Dispatcher错误。因此,我相当确定这个问题不在fileReadingFlow本身。

除了逐个删除每个Component之外,是否有方法可以找出导致错误的原因?

编辑:

来源:

@Bean(name = "fileReadingFlow") 
@Scope("prototype") 
@Profile("test") 
public IntegrationFlow testFileReadingFlow(MyEntity entity) { 

    return IntegrationFlows.from(s -> s.file(new File("someFolder"))) 
      .filter(fileListFilterBuilder.buildFileListFilter(File.class)) 
      , endpointConfigurer -> endpointConfigurer.poller(poller) 
    ) 
      .log(DEBUG, "com.myco.testFileReadingFlow") 
      .enrichHeaders(h -> 
        h.header("entity", entity) 
          .header(FOLDER_NAME, entity.getFolder()) 
      ) 
      .log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers") 
      .handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true)) 
       .log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message) 
       .channel("aggregatingFileChannel") 
       .get(); 
    } 
@Bean 
public MessageChannel aggregatingFileChannel() { 
    return MessageChannels.executor(Executors.newCachedThreadPool()).get(); 

} 

@Bean 
public IntegrationFlow aggregatingFlow() { 

    // Read from the aggregatingFileChannel 
    return from("aggregatingFileChannel") 
      <...> 
      .get(); 
} 

应用:

@SpringBootApplication 
@EnableConfigurationProperties 
@EntityScan(
     basePackages = { "com.myco.model" } 
) 
@EnableJpaRepositories(basePackages = {"com.myco.rest"}) 
public class Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args); 

     MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class); 

     List<MyEntity> entities = entitySvc.findAllActive(); 

     AutowireCapableBeanFactory beanFactory = context.getBeanFactory(); 

     entities.forEach(entity -> { 
      IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity); 

      beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start(); 
    } 

解决方案:

每低于我的意见,@Prototype方法在某些时候做的工作,但我打破了它,不能轻易回滚这一变化。使用加里和阿尔乔姆的建议,我试图改为使用IntegrationFlowContext方法。为了保留运行时启动,配置文件驱动的注入等。我原本将我的IntegrationFlow的定义从@Configuration类移至@Service类。这样我可以将IntegrationFlowContext注入Service,并为我的不同配置文件实施Service的不同版本,而不需要我的Application知道Profile。主要方法是从Context中提取Bean并手动启动它以检索Service并调用方法。

@Service 
@Profile("test") 
public class TestFlowSvc implements FlowSvc { 
    public IntegrationFlow testFileReadingFlow(Vendor vendor) { 
     return // As previous Flow 
    } 

    public void startFileReadingFlow(MyEntity entity) { 

     IntegrationFlow flow = testFileReadingFlow(entity); 

     integrationFlowContext.register(flow, true); 
    } 
} 

应用:

@SpringBootApplication 
@EnableConfigurationProperties 
@EntityScan(
     basePackages = { "com.myco.model" } 
) 
@EnableJpaRepositories(basePackages = {"com.myco.rest"}) 
public class Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args); 

     MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class); 
     FlowSvc flowSvc = context.getBean(FlowSvc.class); 
     List<MyEntity> entities = entitySvc.findAllActive(); 

     entities.forEach(entity -> { 
      flowSvc.startFileReadingFlow(entity); 
    } 

回答

1

我们有Dispatcher has no Subscribers错误时有一些SubscribableChannel不活跃SubscriberMessageHandler在Spring集成的期限)。 “活动”表示根本没有人定义或者处于停止状态。

因此,对于您的问题与application:test.fileReadingFlow.channel#1MessageChannel,我会考虑这样做fileReadingFlowIntegrationFlow一个更多的时间和查找是在第二个匿名DirectChannel

我找不到调试这种问题的简单方法,因为有MessageChannel,但由于Dispatcher has no Subscribers没有任何可跟踪的内容。

因此,请显示fileReadingFlowIntegrationFlow定义,让我们一起解决问题!

寻找你的相关问题,那它应该是一个:

.handle(Files.outboundGateway()) 

这可能只是处于停止状态。

但是我们不能确定,直到实际的代码。

+1

启用调试日志,寻找这样的消息...... '14:20:51.967 [主] INFO oscsDefaultLifecycleProcessor - 相0'起豆 '14:20:51.968 [主] INFO osichannel。 DirectChannel - 频道'application.fromKafka.channel#0'有1个用户。' –

+0

@Artem我添加了示例流程。加里,我看到这样的消息。我的'fileReadingFlow'没有任何。我认为这是因为它是一个原型范围,并且手动启动(包含在编辑中)。 – JudgingNotJudging

+0

由于您没有对原型范围bean的引用,因此不创建任何实例。在任何情况下,您都不能使用'IntegrationFlow' bean的原型范围 - 它们在内部生成许多不在该范围内的bean。 –