3
我有一个SFTP目录并读取文件并将文件发送给ServiceActivator.At我需要使用处理程序并行处理它们。需要在Spring集成中并行处理多个文件
这是我的SPring集成java DSL流。
IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
.temporaryFileSuffix("COPY")
.localDirectory(directory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory("remoteDir"))
.patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
.handle("mybean", "myMethod")
.handle(Files.outboundAdapter(new File("success")))
.deleteSourceFiles(true)
.autoCreateDirectory(true))
.get();
更新:这是我的ThreadPoolExecutor:
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
我没有尝试过这个选项,我得到了一个异常。导致:java.util.concurrent.RejectedExecutionException:任务o[email protected]41c6ed7a拒绝从[email protected] [运行,池大小= 4,活动线程= 4,排队的任务= 20,完成任务= 1]执行速度也很慢,比单线程慢。 – Harish
这不是'TaskExecutor'的问题。你的处理程序有点慢,也许在某个地方被阻塞了。从另一方面来说,你总是可以使用'CallerRunsPolicy'而不是默认的'AbortPolicy'。 –
我们在哪里添加CallerRunsPolicy – Harish