2015-10-30 52 views
0

我们有我们从文件中读取的要求运行春季批处理作业,porocess,并写入到一个平面文件。我的问题是,FlatFileItemReader会跟踪它处理的记录,以便如果作业在中间失败,它可以拾取失败的地方。重新启动由多个线程

例如假设油门限制是2和提交间隔是10,我的文件有20条记录。假设thread1正在处理前10条记录,并且 thread2正在处理下10条记录。如果线程2的所有10条记录都成功处理,并且由于一条错误记录导致线程1失败,并且整个作业都失败。下一次 当作业重新启动时,弹簧如何识别未处理的记录?

什么是更好的方式来处理使用多线程的文件,并在同一时间能够在中间无法启动了。

<batch:job job-repository="jobRepository" id="insertIntoCsvFromCsvJob"> 
     <batch:step id="step1"> 
      <batch:tasklet transaction-manager="transactionManager" 
       task-executor="taskExecutor" throttle-limit="${throttle-limit}"> 
       <batch:chunk reader="csvFileItemReader" writer="customWriter" processor="compositeProcessor 
        commit-interval="${commit-interval}" > 
       </batch:chunk> 
      </batch:tasklet> 
     </batch:step> 
    </batch:job> 

    <bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> 
     <property name="resource" value="classpath:files/input.csv" />   
     <property name="lineMapper" ref="fieldSetMapper" /> 
    </bean> 

    <bean id="csvFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
     <property name="resource" value="file:c:/outout.csv" /> 
     <property name="shouldDeleteIfExists" value="true" /> 
     <property name="lineAggregator" ref="lineAggregator" /> 
    </bean> 

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> 

回答

0

不,它不会。

我甚至会说,你的代码将打破迟早的事。问题在于FlatFileItemReader(分别为FlatFileItemWriter)的读取(分别为写入方法)不是线程安全的。

如果你想异步使用它们,你需要实现一个包装器ItemWriter和ItemReader,它将调用同步到FlatFileItemReader/Writer。

但是,当然,整个中间的重新启动将不起作用,因为如果仅使用FlatFileItemReader/Writer的标准实现,块的顺序不能保证。问题在于块可能超过另一个块,导致执行上下文中的读取位置指针在被超越的块之后移动。但是,如果被超越的块失败,那么执行上下文中的位置将指示失败的块的条目已成功处理。

当然,你也可以实现一个适配器,在那里你跟踪处理的条目中,只有移动位置指针向前,当你知道你的逻辑,是此前的所有条目进行了处理,并已写入。

+0

谢谢。我也这么认为......只是想确认一下。 –