我正在使用Spring Boot 1.4.0.RELEASE,Spring Integration 4.3.1.RELEASE,Spring Integration DSL 1.2.0.M1。春季集成 - 如何调试'调度员没有订户'?
我想要做的事:
我正在写会(使用入站通道适配器)从FTP和本地文件系统中读取文件的应用程序,文件传输到本地工作目录(使用文件出站网关),处理,然后将它们移动到最终目的地(文件出站网关/适配器)。
我遇到了'分派器没有订户通道'错误的问题。我相信这可能意味着上下文中的某些内容被打破,集成组件不会启动。上下文本身表示它在我调试时处于活动状态。
我的实际配置相当大,所以我不想找人为我找到解决方案。我正在寻找一些关于如何查看以及如何确定哪些组件抱怨的指导。
实际的错误在下面。
DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.
的fileReadingFlow
是InboundChannelAdapter
从一个目录(基本上,我问了一下here。这里面没有什么复杂的读取文件。该适配器将消息发送到一个.log()
处理,丰富了头,将其发送到一个处理器(Files.outboundgateway
),最后一个MessageChannel
我已经试过:
- 我已经通过的
MessageChannel
S和东西微升链走(没有拼写错误,全部Bean
存在)。 - 我在
fileReadingFlow
中添加了更多LoggingHandler
s来标识消息错误的位置。 - 我已经删除了
fileReadingFlow
的部分内容,以查看我是否可以将消息更远地传送出去。 - 我已经删除了一些
Component
s,看看我能否找到问题。 - 我添加了
org.springframework.integration
的调试日志记录,并且没有出现类似错误或警告的内容。
我发现那是什么第一次流试图做的不是记录(甚至enrichHeaders)以外的东西,发生了错误调度,并在邮件中errorChannel
结束了。当我将fileReadingFlow
更改为只读文件时,记录一条消息,并终止一个空处理程序,我收到Dispatcher错误。因此,我相当确定这个问题不在fileReadingFlow
本身。
除了逐个删除每个Component
之外,是否有方法可以找出导致错误的原因?
编辑:
来源:
@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {
return IntegrationFlows.from(s -> s.file(new File("someFolder")))
.filter(fileListFilterBuilder.buildFileListFilter(File.class))
, endpointConfigurer -> endpointConfigurer.poller(poller)
)
.log(DEBUG, "com.myco.testFileReadingFlow")
.enrichHeaders(h ->
h.header("entity", entity)
.header(FOLDER_NAME, entity.getFolder())
)
.log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
.handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
.log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
.channel("aggregatingFileChannel")
.get();
}
@Bean
public MessageChannel aggregatingFileChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
@Bean
public IntegrationFlow aggregatingFlow() {
// Read from the aggregatingFileChannel
return from("aggregatingFileChannel")
<...>
.get();
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
AutowireCapableBeanFactory beanFactory = context.getBeanFactory();
entities.forEach(entity -> {
IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);
beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
}
解决方案:
每低于我的意见,@Prototype
方法在某些时候做的工作,但我打破了它,不能轻易回滚这一变化。使用加里和阿尔乔姆的建议,我试图改为使用IntegrationFlowContext
方法。为了保留运行时启动,配置文件驱动的注入等。我原本将我的IntegrationFlow
的定义从@Configuration
类移至@Service
类。这样我可以将IntegrationFlowContext
注入Service
,并为我的不同配置文件实施Service
的不同版本,而不需要我的Application
知道Profile
。主要方法是从Context
中提取Bean
并手动启动它以检索Service
并调用方法。
@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
public IntegrationFlow testFileReadingFlow(Vendor vendor) {
return // As previous Flow
}
public void startFileReadingFlow(MyEntity entity) {
IntegrationFlow flow = testFileReadingFlow(entity);
integrationFlowContext.register(flow, true);
}
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
FlowSvc flowSvc = context.getBean(FlowSvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
entities.forEach(entity -> {
flowSvc.startFileReadingFlow(entity);
}
启用调试日志,寻找这样的消息...... '14:20:51.967 [主] INFO oscsDefaultLifecycleProcessor - 相0'起豆 '14:20:51.968 [主] INFO osichannel。 DirectChannel - 频道'application.fromKafka.channel#0'有1个用户。' –
@Artem我添加了示例流程。加里,我看到这样的消息。我的'fileReadingFlow'没有任何。我认为这是因为它是一个原型范围,并且手动启动(包含在编辑中)。 – JudgingNotJudging
由于您没有对原型范围bean的引用,因此不创建任何实例。在任何情况下,您都不能使用'IntegrationFlow' bean的原型范围 - 它们在内部生成许多不在该范围内的bean。 –