2017-07-24 28 views
1

我有以下的SFTP文件同步:SftpInboundFileSynchronizer不同步

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory()); 
    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>(); 
    compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp")); 
    compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern())); 
    fileSynchronizer.setFilter(compositeFileListFilter); 
    fileSynchronizer.setPreserveTimestamp(true); 
    return fileSynchronizer; 
} 

当应用程序第一次运行时,它同步到与远程SFTP站点目录的本地目录。但是,它无法获取远程SFTP目录文件中的任何后续更改。

它定为轮询如下:

@Bean 
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata")) 
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory()); 
    source.setAutoCreateLocalDirectory(true); 
    ChainFileListFilter<File> chainFileFilter = new ChainFileListFilter<File>(); 
    chainFileFilter.addFilter(new LastModifiedFileListFilter()); 
    FileSystemPersistentAcceptOnceFileListFilter fs = new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem"); 
    fs.setFlushOnUpdate(true); 
    chainFileFilter.addFilter(fs); 
    source.setLocalFilter(chainFileFilter); 
    source.setCountsEnabled(true); 
    return source; 
} 

@Bean 
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) { 
    PollerMetadata pollerMetadata = new PollerMetadata(); 
    List<Advice> adviceChain = new ArrayList<Advice>(); 
    adviceChain.add(retryCompoundTriggerAdvice); 
    pollerMetadata.setAdviceChain(adviceChain); 
    pollerMetadata.setTrigger(compoundTrigger()); 
    pollerMetadata.setMaxMessagesPerPoll(1); 
    return pollerMetadata; 
} 

@Bean 
public CompoundTrigger compoundTrigger() { 
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger()); 
    return compoundTrigger; 
} 

@Bean 
public CronTrigger primaryTrigger() { 
    return new CronTrigger(applicationProperties.getSchedule()); 
} 

@Bean 
public PeriodicTrigger secondaryTrigger() { 
    return new PeriodicTrigger(applicationProperties.getRetryInterval()); 
} 

RetryCompoundTriggerAdviceafterReceive方法延伸AbstractMessageSourceAdvice,我拿到后第一次运行一个空的结果。

如何配置同步器使其能够定期同步(而不仅仅是在应用程序启动时)一次?

更新

我发现,当SFTP网站有我的应用程序启动时在其目录中没有文件,则SftpInboundFileSynchronizer同步在每个轮询间隔。所以我可以在每次调查中看到com.jcraft.jsch日志报表。但只要在SFTP站点上找到文件,它就会同步以在本地获取该文件,然后不再同步。

更新2

我道歉......这里的自定义代码:

@Component 
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice { 

    private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class); 

    private final CompoundTrigger compoundTrigger; 

    private final Trigger override; 

    private final ApplicationProperties applicationProperties; 

    private final Mail mail; 

    private int attempts = 0; 

    private boolean expectedMessage; 
    private boolean inProcess; 

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, 
      @Qualifier("secondaryTrigger") Trigger override, 
      ApplicationProperties applicationProperties, 
      Mail mail) { 
     this.compoundTrigger = compoundTrigger; 
     this.override = override; 
     this.applicationProperties = applicationProperties; 
     this.mail = mail; 
    } 

    @Override 
    public boolean beforeReceive(MessageSource<?> source) { 
     logger.debug("!inProcess is " + !inProcess); 
     return !inProcess; 
    } 

    @Override 
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) { 

     if (expectedMessage) { 
      logger.info("Received expected load file. Setting cron trigger."); 
      this.compoundTrigger.setOverride(null); 
      expectedMessage = false; 
      return result; 
     } 

     final int maxOverrideAttempts = applicationProperties.getMaxFileRetry(); 

     attempts++; 
     if (result == null && attempts < maxOverrideAttempts) { 
      logger.info("Unable to find file after " + attempts + " attempt(s). Will reattempt"); 
      this.compoundTrigger.setOverride(this.override); 
     } else if (result == null && attempts >= maxOverrideAttempts) { 
      String message = "Unable to find daily file" + 
        " after " + attempts + 
        " attempt(s). Will not reattempt since max number of attempts is set at " + 
        maxOverrideAttempts + "."; 
      logger.warn(message); 
      mail.sendAdminsEmail("Missing Load File", message); 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
     } else { 
      attempts = 0; 
      // keep periodically checking until we are certain 
      // that this message is the expected message 
      this.compoundTrigger.setOverride(this.override); 
      inProcess = true; 
      logger.info("Found load file"); 
     } 
     return result; 
    } 

    public void foundExpectedMessage(boolean found) { 
     logger.debug("Expected message was found? " + found); 
     this.expectedMessage = found; 
     inProcess = false; 
    } 

} 
+0

我们需要知道你的'RetryCompoundTriggerAdvice'逻辑。看起来像一次成功之后,你把它变成国家不要以某种方式进行投票。 –

+0

所以,您在解决方案中使用了一些自定义代码,并且您不会与我们分享这些代码,因此期望得到降低的投票和解决问题的近似命题。 –

+0

我的歉意...自定义代码已被添加。 – James

回答

1

你的逻辑:

@Override 
public boolean beforeReceive(MessageSource<?> source) { 
    logger.debug("!inProcess is " + !inProcess); 
    return !inProcess; 
} 

让我们来研究它的JavaDoc:

/** 
* Subclasses can decide whether to proceed with this poll. 
* @param source the message source. 
* @return true to proceed. 
*/ 
public abstract boolean beforeReceive(MessageSource<?> source); 

并围绕这个方法的逻辑:

Message<?> result = null; 
    if (beforeReceive((MessageSource<?>) target)) { 
     result = (Message<?>) invocation.proceed(); 
    } 
    return afterReceive(result, (MessageSource<?>) target); 

所以,我们称之为invocation.proceed()(SFTP同步)只有beforeReceive()回报true。在你的情况下,只有如果!inProcess是这种情况。

afterReceive()实施中,如果您有result - 第一次尝试,则您有inProcess = true;。看起来你只有在有人打电话给foundExpectedMessage()时才将它重置回false

那么,你对我们的期望是什么,作为你的问题的答案?它确实在您的自定义代码中,并且与框架无关。对不起......

+1

感谢您的帮助。你是对的。事实证明,我没有打电话给'foundExpectedMessage'。所以,它从来没有被召唤过。对不起,我没有看到这一点。 – James