我正在使用自定义Iteamreader bean从MongoDB读取数据。我的阅读器按照reader中定义的pageSize(50)返回数据。但是处理器只能从50开始获得31行数据。我尝试了各种块大小,但是一些处理器只获得了前31行。spring批处理:itemprocessor未获取读取器读取的所有数据
请帮我找到了这个错误......我想听众,但没能找到问题..
---- XML配置----
<?xml version="1.0" encoding="UTF-8"?>
<context:property-placeholder location="classpath:application.properties" />
<context:component-scan base-package="com.xxx.yyy.batch.kernel" />
<context:component-scan base-package="com.xxx.yyy.batch.dao" />
<context:annotation-config />
<!-- Enable Annotation based Declarative Transaction Management -->
<tx:annotation-driven proxy-target-class="true"
transaction-manager="transactionManager" />
<!-- Creating TransactionManager Bean, since JDBC we are creating of type
DataSourceTransactionManager -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<batch:job id="txnLogJob" job-repository="jobRepository"
restartable="true">
<batch:step id="txnload">
<tasklet allow-start-if-complete="true">
<chunk reader="txnLogItemReader" writer="txnLogItemWriter"
processor="txnLogProcessor" commit-interval="20" />
</tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="completionListener" />
</batch:listeners>
</batch:job>
<bean id="completionListener"
class="com.xxx.yyy.batch.listeners.JobCompletionNotificationListener" />
<bean id="jobParametersDAOImpl" class="com.xxx.yyy.batch.dao.JobParametersDAOImpl" />
<bean id="batchLoader" class="com.xxx.yyy.batch.kernel.BatchLoader" />
<bean id="batchjobParameter" class="com.xxx.yyy.batch.dao.Batch_Job_Parameters" />
<bean id="txnLogItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step">
<property name="shouldDeleteIfExists" value="true" />
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean
class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
</property>
</bean>
<bean id="txnLogProcessor"
class="com.xxx.yyy.batch.processor.MessageContextItemProcessor" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="databaseType" value="MYSQL" />
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="dataSource" class="com.xxx.yyy.common.DataSource"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<property name="connectionProperties" value="${jdbc.connectionProperties}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxTotal" value="${jdbc.maxTotal}" />
<property name="maxIdle" value="${jdbc.maxIdle}" />
<property name="minIdle" value="${jdbc.minIdle}" />
<property name="maxWaitMillis" value="${jdbc.maxWaitMillis}" />
<property name="testOnBorrow" value="${jdbc.testOnBorrow}" />
<property name="testWhileIdle" value="${jdbc.testWhileIdle}" />
<property name="testOnReturn" value="${jdbc.testOnReturn}" />
<property name="validationQuery" value="${jdbc.validationQuery}" />
</bean>
</beans>
定制读者豆:
@Bean
public MongoItemReader<MessageContext> txnLogItemReader() {
MongoItemReader<MessageContext> reader = new MongoItemReader<MessageContext>();
reader.setPageSize(50);
reader.setCollection("txnlog");
reader.setTemplate(mongoTemplate);
String query = null ;
query = "{ \"audit_info.created_on\": { $gt: { \"$date\" : ?0 }, $lte: { \"$date\" : ?1 } }, "
+ "$and: [ { \"processing_status\": { $in: [?2] } } ] }" ;
reader.setQuery(query);
//Timestamp to_date_timestamp = jobParametersDAOImpl.getCurrentTimeStamp() ;
Batch_Job_Parameters job_param = jobParametersDAOImpl.getBatchJobParameters() ;
String from_date = job_param.getFrom_date().toString() ;
String [] splitstr = from_date.split(" ") ;
from_date = splitstr[0]+"T"+splitstr[1]+"00Z" ;
String to_date = job_param.getTo_date().toString() ;
splitstr = to_date.split(" ") ;
to_date = splitstr[0]+"T"+splitstr[1]+"00Z" ;
List<Object> parameterValues = new ArrayList<Object>() ;
parameterValues.add(from_date) ;
parameterValues.add(to_date) ;
parameterValues.add(job_param.getTxnlog_status_list()) ;
reader.setParameterValues(parameterValues);
reader.setTargetType(com....MessageContext.class);
Map<String,Direction> sorts = new HashMap<String,Direction>() ;
sorts.put("audit_info.created_on", org.springframework.data.domain.Sort.Direction.ASC) ;
reader.setSort(sorts);
return reader;
}
MongoDB包含超过500行,正如我前面提到的,我可以看到所有正在读取的50行。 –
MongoDB包含超过500行,我可以看到正在读取的所有50行。我将侦听程序(步,块,读)放入配置中。听者的 输出是: “步骤之前”,“前块” “之前读” 50行数据 “读出之后” 19次: “之前读” “之后读出” 20行数据的“前块” 10倍处理器和写入器处理 “块后”: “之前读” “读取后”,“块之后” “之前读” 10行由处理器和作家 处理的数据的“步骤后” –