2016-03-11 31 views
3

我有一个SFTP目录并读取文件并将文件发送给ServiceActivator.At我需要使用处理程序并行处理它们。需要在Spring集成中并行处理多个文件

这是我的SPring集成java DSL流。

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory()) 
         .temporaryFileSuffix("COPY") 
         .localDirectory(directory) 
         .deleteRemoteFiles(false) 
         .preserveTimestamp(true) 
         .remoteDirectory("remoteDir")) 
         .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5))) 
         .handle("mybean", "myMethod") 
         .handle(Files.outboundAdapter(new File("success")))   
         .deleteSourceFiles(true) 
         .autoCreateDirectory(true)) 
         .get(); 

更新:这是我的ThreadPoolExecutor:

@Bean(name = "executor") 
public Executor getExecutor() 
{ 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    executor.setCorePoolSize(4); 
    executor.setMaxPoolSize(4); 
    executor.setQueueCapacity(20);   
    executor.initialize(); 
    return executor; 
} 

回答

1

Sftp.inboundAdapter()SftpInboundFileSynchronizingMessageSource)返回远程文件逐一反正。首先,它将它们同步到本地目录,并且只有在它们轮询消息处理为File有效负载之后。

要并行处理它们就足以将taskExecutor添加到您的e.poller()定义中,并且所有这些maxMessagesPerPoll(5)将分配给不同的线程。

+0

我没有尝试过这个选项,我得到了一个异常。导致:java.util.concurrent.RejectedExecutionException:任务o[email protected]41c6ed7a拒绝从[email protected] [运行,池大小= 4,活动线程= 4,排队的任务= 20,完成任务= 1]执行速度也很慢,比单线程慢。 – Harish

+0

这不是'TaskExecutor'的问题。你的处理程序有点慢,也许在某个地方被阻塞了。从另一方面来说,你总是可以使用'CallerRunsPolicy'而不是默认的'AbortPolicy'。 –

+0

我们在哪里添加CallerRunsPolicy – Harish

相关问题