2012-01-17 47 views
3

我是Spring Integration的新手。弹簧集成2与石英调度器

我已经配置了Spring文件入站通道适配器,例如,

<file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv"> 
     <si:poller> 
      <si:interval-trigger interval="1000"/> 
     </si:poller> 
</file:inbound-channel-adapter> 

<si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/> 

现在这工作正常。 但是这需要部署在集群环境中。我想确保群集中的多个实例不会尝试读取同一个文件。那么这样的工作会在这样的环境下进行?

如果没有,我可以使用Quartz调度这样的:

<file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv"> 
      <si:poller task-executor="taskExecutor" fixed-rate="1000"/> 
    </file:inbound-channel-adapter> 

    <si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/> 

    <bean id="taskExecutor" class="org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor"> 
     <property name="threadCount" value="20"/> 
     <property name="threadNamePrefix" value="consumer"/> 
    </bean> 

将这项工作与解决我的问题? 或者我必须使用交易?

我希望问题很清楚。

感谢, 阿迪

+0

再次重申这个问题, 问题是与文件在集群环境中入站通道适配器。当一个文件被放置在一个文件夹中时,它应该被拾取,处理并最终重命名它。 在群集中,虽然一个实例选取特定文件并仍在处理它,但另一个节点的文件适配器也会启动并尝试处理。第二个适配器失败,出现文件未找到异常,作为第一个适配器进程并在此期间重命名。那么,我能做些什么,这不会发生? – adi 2012-01-17 16:34:04

回答

2

当多个进程从同一目录读取它可以 需要锁定文件,以防止它们被拾起 兼任。要做到这一点,您可以使用FileLocker

查看文件柜here附近的文档。看来,你可以做soemthing这样的:

<file:inbound-channel-adapter ... > 
    <file:nio-locker/> 
</file:inbound-channel-adapter> 

当多个进程从同一目录读取它可以 需要锁定文件,以防止它们被拾起 兼任。要做到这一点,您可以使用FileLocker

+0

我现在得到这个错误:java.io.IOException:进程无法访问文件,因为另一个进程已经锁定了一部分文件...我有另一个通道重命名文件... – adi 2012-01-17 17:59:49

1

要确保石英计划的作业在群集内执行一次且只执行一次,请配置持久的群集石英作业计划。这里有一个样本配置,为石英1.6.6:

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> 
    <!--  Set whether any jobs defined on this SchedulerFactoryBean should 
      overwrite existing job definitions. 
     --> 
    <property name="overwriteExistingJobs" value="true" /> 
    <property name="dataSource" ref="myTransactionalDataSource" /> 

<!-- nonTransactionalDataSource is only necessary with clustered Quartz with an XA DataSource. 
    --> 
    <property name="nonTransactionalDataSource" ref="myNonTransactionalDataSource" /> 

<property name="quartzProperties"> 
    <props> 
    <prop key="org.quartz.jobStore.selectWithLockSQL">SELECT * FROM {0}LOCKS WITH(UPDLOCK,HOLDLOCK) WHERE LOCK_NAME = ?</prop> 
    <!-- 
    Run in cluster. Quartz ensures persisted jobs are executed once within the 
         cluster 
    --> 
    <prop key="org.quartz.jobStore.isClustered">true</prop> 

<!-- Each node in the cluster must have a unique instance id. 
    --> 
    <prop key="org.quartz.scheduler.instanceId">AUTO</prop> 
<!-- Default clusterCheckinInterval is 15000 
    --> 
    <!-- <prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop> 
    --> 
</props> 
    </property> 
    <property name="transactionManager" ref="transactionManager" /> 
- <!-- 
     In Quartz 1.6.6, Quartz's ThreadPool interface is used when firing job triggers, 
     in org.quartz.core.QuartzSchedulerThread. 
     Quartz 1.x still starts some unmanaged threads, notably org.quartz.impl.jdbcjobstore.JobStoreSupport's 
     ClusterManager which is used when clustered=true. Quartz 2.0 should correct this problem.  
    --> 
    <property name="taskExecutor" ref="myTaskExecutor" /> 
    </bean>