2016-03-23 24 views
0

我正在编写春季批处理作业。我正在实施自定义编写器使用KafkaClientWriter extends AbstractItemStreamItemWriter<ProducerMessage>如何在春季批次中为每个作业实例启动编写器

我有每个实例必须是唯一的字段。但我可以看到这个班只发起一次。其余作业有相同的作家类实例。 我的自定义阅读器和处理器正在为每项工作启动。 以下是我的工作配置。我怎样才能为作家实现同样的行为?

@Bean 
     @Scope("job") 
     public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) { 
      FlatFileItemReader faltFileReader = new FlatFileItemReader(); 
      ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader(); 
      Resource[] resArray = new Resource[1]; 
      resArray[0] = new FileSystemResource(new File(fileName)); 
      zipReader.setArchives(resArray); 
      DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>(); 
      lineMapper.setLineTokenizer(new DelimitedLineTokenizer()); 
      CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService); 
      lineMapper.setFieldSetMapper(csvFieldMapper); 
      faltFileReader.setLineMapper(lineMapper); 
      zipReader.setDelegate(faltFileReader); 
      return zipReader; 
     } 

     @Bean 
     @Scope("job") 
     public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) { 
      ProducerProcessor processor = new ProducerProcessor(); 
      processor.setS3FileTimeStamp(timeStamp); 
      return processor; 
     } 

     @Bean 
     @ConfigurationProperties 
     public ItemWriter<ProducerMessage> writer() { 
      return new KafkaClientWriter(); 
     } 

     @Bean 
     public Step step1(StepBuilderFactory stepBuilderFactory, 
          ItemReader reader, ItemWriter writer, 
          ItemProcessor processor, @Value("${reader.chunkSize}") 
          int chunkSize) { 
      LOGGER.info("Step configuration loaded with chunk size {}", chunkSize); 
      return stepBuilderFactory.get("step1") 
        .chunk(chunkSize).reader(reader) 
        .processor(processor).writer(writer) 
        .build(); 
     } 

     @Bean 
     public StepScope stepScope() { 
      final StepScope stepScope = new StepScope(); 
      stepScope.setAutoProxy(true); 
      return stepScope; 
     } 

     @Bean 
     public JobScope jobScope() { 
      final JobScope jobScope = new JobScope(); 
      return jobScope; 
     } 

     @Bean 
     public Configuration configuration() { 
      return new Configuration(); 
     } 

我试着让作家的作业范围。但是在这种情况下,open并没有被调用。这是我正在做一些初始化的地方。

+0

将返回类型更改为'KafkaClientWriter'并添加@Scope(“job”)'。 –

+0

@M。 Denium:很好。谢谢,它工作。现在打开并关闭正确调用。为什么'@Scope(“job”)'返回类型为'ItemWriter '没有工作?我正在从写入方法获得一些自定义字段,本应从open打开。 –

回答

0

当使用基于java的配置和作用域代理时,会发生什么情况就是检测到该方法的返回类型,并且创建了代理。因此,当您返回ItemWriter时,您将得到一个仅实现ItemWriter的JDK代理,而您的open方法位于ItemStream接口上。由于该接口未包含在代理中,因此无法调用该方法。

将返回类型更改为KafkaClientWriterItemStreamWriter< ProducerMessage>(假设KafkaCLientWriter实现该方法)。接下来添加@Scope("job"),您应该再次使用适当范围的作者调用open方法。