2016-04-25 53 views
0

我正在使用自定义Iteamreader bean从MongoDB读取数据。我的阅读器按照reader中定义的pageSize(50)返回数据。但是处理器只能从50开始获得31行数据。我尝试了各种块大小,但是一些处理器只获得了前31行。spring批处理:itemprocessor未获取读取器读取的所有数据

请帮我找到了这个错误......我想听众,但没能找到问题..

---- XML配置----

<?xml version="1.0" encoding="UTF-8"?> 

    <context:property-placeholder location="classpath:application.properties" /> 

    <context:component-scan base-package="com.xxx.yyy.batch.kernel" /> 
    <context:component-scan base-package="com.xxx.yyy.batch.dao" /> 

    <context:annotation-config /> 

    <!-- Enable Annotation based Declarative Transaction Management --> 
    <tx:annotation-driven proxy-target-class="true" 
     transaction-manager="transactionManager" /> 

    <!-- Creating TransactionManager Bean, since JDBC we are creating of type 
     DataSourceTransactionManager --> 
    <bean id="transactionManager" 
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> 
     <property name="dataSource" ref="dataSource" /> 
    </bean> 

    <batch:job id="txnLogJob" job-repository="jobRepository" 
     restartable="true"> 
     <batch:step id="txnload"> 
      <tasklet allow-start-if-complete="true"> 
       <chunk reader="txnLogItemReader" writer="txnLogItemWriter" 
        processor="txnLogProcessor" commit-interval="20" />    
      </tasklet> 
     </batch:step> 

     <batch:listeners> 
      <batch:listener ref="completionListener" /> 
     </batch:listeners> 
    </batch:job> 

    <bean id="completionListener" 
     class="com.xxx.yyy.batch.listeners.JobCompletionNotificationListener" /> 


    <bean id="jobParametersDAOImpl" class="com.xxx.yyy.batch.dao.JobParametersDAOImpl" /> 

    <bean id="batchLoader" class="com.xxx.yyy.batch.kernel.BatchLoader" /> 

    <bean id="batchjobParameter" class="com.xxx.yyy.batch.dao.Batch_Job_Parameters" /> 



    <bean id="txnLogItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" 
     scope="step"> 
     <property name="shouldDeleteIfExists" value="true" /> 
     <property name="resource" value="file:target/test-outputs/output.txt" /> 
     <property name="lineAggregator"> 
      <bean 
       class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
     </property> 
    </bean> 


    <bean id="txnLogProcessor" 
     class="com.xxx.yyy.batch.processor.MessageContextItemProcessor" /> 

    <bean id="jobLauncher" 
     class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
     <property name="jobRepository" ref="jobRepository" /> 
    </bean> 

    <bean id="jobRepository" 
     class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> 
     <property name="databaseType" value="MYSQL" /> 
     <property name="dataSource" ref="dataSource" /> 
     <property name="transactionManager" ref="transactionManager" /> 
    </bean> 

    <bean id="dataSource" class="com.xxx.yyy.common.DataSource" 
     destroy-method="close"> 
     <property name="driverClassName" value="${jdbc.driverClassName}" /> 
     <property name="url" value="${jdbc.url}" /> 
     <property name="username" value="${jdbc.username}" /> 
     <property name="password" value="${jdbc.password}" /> 
     <property name="connectionProperties" value="${jdbc.connectionProperties}" /> 
     <property name="initialSize" value="${jdbc.initialSize}" /> 
     <property name="maxTotal" value="${jdbc.maxTotal}" /> 
     <property name="maxIdle" value="${jdbc.maxIdle}" /> 
     <property name="minIdle" value="${jdbc.minIdle}" /> 
     <property name="maxWaitMillis" value="${jdbc.maxWaitMillis}" /> 
     <property name="testOnBorrow" value="${jdbc.testOnBorrow}" /> 
     <property name="testWhileIdle" value="${jdbc.testWhileIdle}" /> 
     <property name="testOnReturn" value="${jdbc.testOnReturn}" /> 
     <property name="validationQuery" value="${jdbc.validationQuery}" /> 
    </bean> 

</beans> 

定制读者豆:

@Bean 
    public MongoItemReader<MessageContext> txnLogItemReader() { 
     MongoItemReader<MessageContext> reader = new MongoItemReader<MessageContext>(); 
     reader.setPageSize(50); 
     reader.setCollection("txnlog"); 
     reader.setTemplate(mongoTemplate); 

     String query = null ; 
     query = "{ \"audit_info.created_on\": { $gt: { \"$date\" : ?0 }, $lte: { \"$date\" : ?1 } }, " 
         + "$and: [ { \"processing_status\": { $in: [?2] } } ] }" ; 




     reader.setQuery(query);   

     //Timestamp to_date_timestamp = jobParametersDAOImpl.getCurrentTimeStamp() ;   

     Batch_Job_Parameters job_param = jobParametersDAOImpl.getBatchJobParameters() ; 
     String from_date = job_param.getFrom_date().toString() ; 
     String [] splitstr = from_date.split(" ") ; 
     from_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     String to_date = job_param.getTo_date().toString() ; 
     splitstr = to_date.split(" ") ; 
     to_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     List<Object> parameterValues = new ArrayList<Object>() ; 
     parameterValues.add(from_date) ; 
     parameterValues.add(to_date) ;  
     parameterValues.add(job_param.getTxnlog_status_list()) ;  

     reader.setParameterValues(parameterValues); 

     reader.setTargetType(com....MessageContext.class); 
     Map<String,Direction> sorts = new HashMap<String,Direction>() ; 
     sorts.put("audit_info.created_on", org.springframework.data.domain.Sort.Direction.ASC) ; 
     reader.setSort(sorts); 

     return reader; 
    } 

回答

0

更新的答案,因为这个问题将焦点转移

我会让MessageContextReadConverter不返回null,而是在Processor中进行验证。如果Processor返回null,它只是增加过滤器计数,而不是混淆Step,因为认为没有更多的行要处理。

+0

MongoDB包含超过500行,正如我前面提到的,我可以看到所有正在读取的50行。 –

+0

MongoDB包含超过500行,我可以看到正在读取的所有50行。我将侦听程序(步,块,读)放入配置中。听者的 输出是: “步骤之前”,“前块” “之前读” 50行数据 “读出之后” 19次: “之前读” “之后读出” 20行数据的“前块” 10倍处理器和写入器处理 “块后”: “之前读” “读取后”,“块之后” “之前读” 10行由处理器和作家 处理的数据的“步骤后” –

0

我已经实现了MessageContextReadConverter implements Converter,并且在返回null的情况下转换没有完成。因此,如果为null,read()方法不会将元素进一步传递给Processor/Writer。问题是Converter does not允许抛出异常。展望如何解决这部分。

相关问题